1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.impl.groovy;
12
13 import org.astrogrid.applications.beans.v1.cea.castor.ResultListType;
14 import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15 import org.astrogrid.component.descriptor.ComponentDescriptor;
16 import org.astrogrid.jes.job.JobFactory;
17 import org.astrogrid.jes.jobscheduler.Dispatcher;
18 import org.astrogrid.jes.jobscheduler.JobScheduler;
19 import org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl;
20 import org.astrogrid.jes.util.TemporaryBuffer;
21 import org.astrogrid.workflow.beans.v1.Step;
22 import org.astrogrid.workflow.beans.v1.Workflow;
23
24 import java.io.StringReader;
25 import java.io.StringWriter;
26 import java.io.Writer;
27
28 import javax.xml.transform.Transformer;
29 import javax.xml.transform.stream.StreamResult;
30 import javax.xml.transform.stream.StreamSource;
31
32 import junit.framework.Test;
33
34 /*** implementation of jobscheduler, based on groovy rules engine and state machine.
35 * @author Noel Winstanley nw@jb.man.ac.uk 26-Jul-2004
36 *
37 */
38 public class GroovySchedulerImpl extends AbstractJobSchedulerImpl
39 implements JobScheduler, ComponentDescriptor{
40 /*** configuration component - abstracts from messy business of finding stylesheets, etc. */
41 public interface Transformers {
42 /*** transformer of workflows to set of rules. */
43 public Transformer getCompiler() throws Exception;
44 /*** annotator of workflows with id attributes */
45 public Transformer getWorkflowAnnotator() throws Exception;
46 }
47 /*** Construct a new GroovySchedulerImpl
48 * @param factory produces jobs
49 * @param transformers compiles documents into rulebase.
50 * @param dispatcher dispatches individual steps to cea
51 * @param interpFactory creates interpreters for compiled rulebases.
52 */
53 public GroovySchedulerImpl(JobFactory factory,Transformers transformers,Dispatcher dispatcher,GroovyInterpreterFactory interpFactory) {
54 super(factory);
55 this.transformers = transformers;
56 this.disp = dispatcher;
57 this.interpFactory = interpFactory;
58 }
59 protected final Transformers transformers;
60 protected final Dispatcher disp;
61 protected final GroovyInterpreterFactory interpFactory;
62 /***
63 * annotate the workflow with id numbers. then
64 * compile the workflow into xml rules with embedded groovy scripts., deserialize this as interpreter, add as extension to workflow.
65 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#initializeJob(org.astrogrid.workflow.beans.v1.Workflow)
66 *
67 */
68 protected Workflow initializeJob(Workflow job) throws Exception {
69
70 StringWriter wfWriter = new StringWriter();
71 job.marshal(wfWriter);
72 wfWriter.close();
73 String originalWorkflowDoc = wfWriter.toString();
74
75 TemporaryBuffer buff = interpFactory.getBuffer();
76
77 buff.writeMode();
78 Writer annotatedWorkflowDoc = buff.getWriter();
79 transformers.getWorkflowAnnotator().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(annotatedWorkflowDoc));
80
81 buff.readMode();
82 Workflow annotatedJob = Workflow.unmarshalWorkflow(buff.getReader());
83
84
85 buff.writeMode();
86 Writer rules = buff.getWriter();
87 transformers.getCompiler().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(rules));
88
89
90
91 buff.readMode();
92 GroovyInterpreter interp = interpFactory.newInterpreter(buff.getContents(),new JesInterface(annotatedJob,disp,this));
93 interpFactory.pickleTo(interp,annotatedJob);
94
95 return annotatedJob;
96 }
97 /*** unpickle the interpreter from the workflow, then try to run any triggerable rules */
98 protected void scheduleSteps(Workflow job) {
99 GroovyInterpreter interp = null;
100 try {
101 interp = interpFactory.unpickleFrom(new JesInterface(job,disp,this));
102 } catch (PickleException e) {
103 logger.error("Can't create workflow interpreter",e);
104 recordFatalError(job,e);
105 return;
106 }
107 try {
108 interp.run();
109 } catch (ScriptEngineException e) {
110 logger.warn("Run threw exception",e);
111 recordFatalError(job,e);
112 return;
113 }
114 try {
115 interpFactory.pickleTo(interp,job);
116 } catch (PickleException e) {
117 logger.error("Can't pickle interpreter");
118 recordFatalError(job,e);
119 }
120 }
121
122 /***
123 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateJobStatus(org.astrogrid.workflow.beans.v1.Workflow)
124 */
125 protected void updateJobStatus(Workflow job) {
126
127 }
128
129 /***
130 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#getStepForFragment(org.astrogrid.workflow.beans.v1.Workflow, java.lang.String)
131 */
132 protected Step getStepForFragment(Workflow job, String fragment) {
133 return (Step)job.findXPathValue("//*[id='" + fragment + "']");
134 }
135
136
137 /***
138 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateStepStatus(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase)
139 * do what parent, does, and then update corresponding state in interpreter too.
140 * quite inefficient that we need to recreate interpreter for this. oh well, at least no script is called.
141 */
142 protected void updateStepStatus(Workflow wf, Step step, ExecutionPhase status) {
143 super.updateStepStatus(wf, step, status);
144 try {
145 GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
146 interp.updateStepStatus(step,status);
147 interpFactory.pickleTo(interp,wf);
148 } catch (PickleException e) {
149 logger.error("Failed to update step status",e);
150 recordFatalError(wf,e);
151 }
152 }
153 /*** store results of a cea computation back into workfllow interpreter.
154 * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#storeResults(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.ResultListType)
155 */
156 protected void storeResults(Workflow wf, Step step, ResultListType results) {
157
158 if (step.getResultVar() == null) {
159 return;
160 }
161 try {
162 GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
163 interp.storeResults(step,results);
164 interpFactory.pickleTo(interp,wf);
165 } catch (PickleException e) {
166 logger.error("Failed to store results",e);
167 recordFatalError(wf,e);
168 }
169 }
170
171 /***
172 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getName()
173 */
174 public String getName() {
175 return "scripted-scheduler";
176 }
177
178 /***
179 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getDescription()
180 */
181 public String getDescription() {
182 return "Scheduler Implementation that uses a scripting engine";
183 }
184
185 /***
186 * @see org.astrogrid.component.descriptor.ComponentDescriptor#getInstallationTest()
187 */
188 public Test getInstallationTest() {
189 return null;
190 }
191
192
193
194
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235