1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.impl.groovy;
12
13 import org.astrogrid.applications.beans.v1.cea.castor.ResultListType;
14 import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15 import org.astrogrid.component.descriptor.ComponentDescriptor;
16 import org.astrogrid.jes.job.JobFactory;
17 import org.astrogrid.jes.jobscheduler.Dispatcher;
18 import org.astrogrid.jes.jobscheduler.JobScheduler;
19 import org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl;
20 import org.astrogrid.jes.util.Cache;
21 import org.astrogrid.jes.util.TemporaryBuffer;
22 import org.astrogrid.workflow.beans.v1.Step;
23 import org.astrogrid.workflow.beans.v1.Workflow;
24
25 import org.exolab.castor.xml.Marshaller;
26
27 import java.io.StringReader;
28 import java.io.StringWriter;
29 import java.io.Writer;
30
31 import javax.xml.transform.Transformer;
32 import javax.xml.transform.stream.StreamResult;
33 import javax.xml.transform.stream.StreamSource;
34
35 import junit.framework.Test;
36
37 /*** implementation of jobscheduler, based on groovy rules engine and state machine.
38 * @author Noel Winstanley nw@jb.man.ac.uk 26-Jul-2004
39 *
40 */
41 public class GroovySchedulerImpl extends AbstractJobSchedulerImpl
42 implements JobScheduler, ComponentDescriptor{
43 /*** configuration component - abstracts from messy business of finding stylesheets, etc. */
44 public interface Transformers {
45 /*** transformer of workflows to set of rules. */
46 public Transformer getCompiler() throws Exception;
47 /*** annotator of workflows with id attributes */
48 public Transformer getWorkflowAnnotator() throws Exception;
49 }
50 /*** Construct a new GroovySchedulerImpl
51 * @param factory produces jobs
52 * @param transformers compiles documents into rulebase.
53 * @param dispatcher dispatches individual steps to cea
54 * @param interpFactory creates interpreters for compiled rulebases.
55 */
56 public GroovySchedulerImpl(JobFactory factory,Transformers transformers,Dispatcher dispatcher,GroovyInterpreterFactory interpFactory) {
57 super(factory);
58 this.transformers = transformers;
59 this.disp = dispatcher;
60 this.interpFactory = interpFactory;
61 }
62 protected final Transformers transformers;
63 protected final Dispatcher disp;
64 protected final GroovyInterpreterFactory interpFactory;
65
66 /***
67 * annotate the workflow with id numbers. then
68 * compile the workflow into xml rules with embedded groovy scripts., deserialize this as interpreter, add as extension to workflow.
69 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#initializeJob(org.astrogrid.workflow.beans.v1.Workflow)
70 *
71 */
72 protected Workflow initializeJob(Workflow job) throws Exception {
73
74 StringWriter wfWriter = new StringWriter();
75 job.marshal(wfWriter);
76 wfWriter.close();
77 String originalWorkflowDoc = wfWriter.toString();
78
79 TemporaryBuffer buff = interpFactory.getBuffer();
80
81 buff.writeMode();
82 Writer annotatedWorkflowDoc = buff.getWriter();
83 transformers.getWorkflowAnnotator().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(annotatedWorkflowDoc));
84
85 buff.readMode();
86 Workflow annotatedJob = Workflow.unmarshalWorkflow(buff.getReader());
87
88
89 buff.writeMode();
90 Writer rules = buff.getWriter();
91 transformers.getCompiler().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(rules));
92
93
94
95 buff.readMode();
96
97 System.out.println(buff.getContents());
98 GroovyInterpreter interp = interpFactory.newInterpreter(buff.getContents(),new JesInterface(annotatedJob,disp,this));
99 interpFactory.pickleTo(interp,annotatedJob);
100
101 return annotatedJob;
102 }
103 /*** unpickle the interpreter from the workflow, then try to run any triggerable rules */
104 protected void scheduleSteps(Workflow job) {
105 GroovyInterpreter interp = null;
106 try {
107 interp = interpFactory.unpickleFrom(new JesInterface(job,disp,this));
108 } catch (PickleException e) {
109 logger.error("Can't create workflow interpreter",e);
110 recordFatalError(job,e);
111 return;
112 }
113 try {
114 interp.run();
115 } catch (ScriptEngineException e) {
116 logger.warn("Run threw exception",e);
117 recordFatalError(job,e);
118 return;
119 }
120 try {
121 interpFactory.pickleTo(interp,job);
122 } catch (PickleException e) {
123 logger.error("Can't pickle interpreter");
124 recordFatalError(job,e);
125 }
126 }
127
128 /***
129 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateJobStatus(org.astrogrid.workflow.beans.v1.Workflow)
130 */
131 protected void updateJobStatus(Workflow job) {
132
133 }
134
135 /***
136 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#getStepForFragment(org.astrogrid.workflow.beans.v1.Workflow, java.lang.String)
137 */
138 protected Step getStepForFragment(Workflow job, String fragment) {
139 return (Step)job.findXPathValue("//*[id='" + fragment + "']");
140 }
141
142
143 /***
144 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateStepStatus(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase)
145 * do what parent, does, and then update corresponding state in interpreter too.
146 */
147 protected void updateStepStatus(Workflow wf, Step step, ExecutionPhase status) {
148 super.updateStepStatus(wf, step, status);
149 try {
150 GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
151 interp.updateStepStatus(step,status);
152 interpFactory.pickleTo(interp,wf);
153 } catch (PickleException e) {
154 logger.error("Failed to update step status",e);
155 recordFatalError(wf,e);
156 }
157 }
158 /*** store results of a cea computation back into workfllow interpreter.
159 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#storeResults(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.ResultListType)
160 */
161 protected void storeResults(Workflow wf, Step step, ResultListType results) {
162
163 if (step.getResultVar() == null) {
164 return;
165 }
166 try {
167 GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
168 interp.storeResults(step,results);
169 interpFactory.pickleTo(interp,wf);
170 } catch (PickleException e) {
171 logger.error("Failed to store results",e);
172 recordFatalError(wf,e);
173 }
174 }
175
176 /***
177 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getName()
178 */
179 public String getName() {
180 return "scripted-scheduler";
181 }
182
183 /***
184 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getDescription()
185 */
186 public String getDescription() {
187 return "Scheduler Implementation that uses a scripting engine";
188 }
189
190 /***
191 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getInstallationTest()
192 */
193 public Test getInstallationTest() {
194 return null;
195 }
196
197
198
199
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252