View Javadoc

1   /*$Id: GroovySchedulerImpl.java,v 1.5 2004/11/05 16:52:42 jdt Exp $
2    * Created on 26-Jul-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid 
7    * Software License version 1.2, a copy of which has been included 
8    * with this distribution in the LICENSE.txt file.  
9    *
10  **/
11  package org.astrogrid.jes.jobscheduler.impl.groovy;
12  
13  import org.astrogrid.applications.beans.v1.cea.castor.ResultListType;
14  import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15  import org.astrogrid.component.descriptor.ComponentDescriptor;
16  import org.astrogrid.jes.job.JobFactory;
17  import org.astrogrid.jes.jobscheduler.Dispatcher;
18  import org.astrogrid.jes.jobscheduler.JobScheduler;
19  import org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl;
20  import org.astrogrid.jes.util.TemporaryBuffer;
21  import org.astrogrid.workflow.beans.v1.Step;
22  import org.astrogrid.workflow.beans.v1.Workflow;
23  
24  import java.io.StringReader;
25  import java.io.StringWriter;
26  import java.io.Writer;
27  
28  import javax.xml.transform.Transformer;
29  import javax.xml.transform.stream.StreamResult;
30  import javax.xml.transform.stream.StreamSource;
31  
32  import junit.framework.Test;
33  
34  /*** implementation of jobscheduler, based on groovy rules engine and state machine.
35   * @author Noel Winstanley nw@jb.man.ac.uk 26-Jul-2004
36   *
37   */
38  public class GroovySchedulerImpl extends AbstractJobSchedulerImpl 
39      implements JobScheduler, ComponentDescriptor{
40      /*** configuration component - abstracts from messy business of finding stylesheets, etc. */
41      public interface Transformers {
42          /*** transformer of workflows to set of rules. */
43          public Transformer getCompiler() throws Exception;
44          /*** annotator of workflows with id attributes */
45          public Transformer getWorkflowAnnotator() throws Exception;
46      }
47      /*** Construct a new GroovySchedulerImpl
48       * @param factory produces jobs
49       * @param transformers compiles documents into rulebase.
50       * @param dispatcher dispatches individual steps to cea
51       * @param interpFactory creates interpreters for compiled rulebases.
52       */
53      public GroovySchedulerImpl(JobFactory factory,Transformers transformers,Dispatcher dispatcher,GroovyInterpreterFactory interpFactory) {
54          super(factory);
55          this.transformers = transformers;
56          this.disp = dispatcher;
57          this.interpFactory = interpFactory;       
58      }
59      protected final Transformers transformers;
60      protected final Dispatcher disp;
61      protected final GroovyInterpreterFactory interpFactory;
62      /*** 
63       * annotate the workflow with id numbers. then
64       * compile the workflow into xml rules with embedded groovy scripts., deserialize this as interpreter, add as extension to workflow.
65       * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#initializeJob(org.astrogrid.workflow.beans.v1.Workflow)
66       *
67       */ 
68      protected Workflow initializeJob(Workflow job) throws Exception {
69          // serialize input workflow to string
70          StringWriter wfWriter = new StringWriter();
71          job.marshal(wfWriter);
72          wfWriter.close();
73          String originalWorkflowDoc  = wfWriter.toString();        
74  
75          TemporaryBuffer buff = interpFactory.getBuffer();
76          // annotate this workflow
77          buff.writeMode();
78          Writer annotatedWorkflowDoc = buff.getWriter();
79          transformers.getWorkflowAnnotator().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(annotatedWorkflowDoc));
80          
81          buff.readMode();
82          Workflow annotatedJob =  Workflow.unmarshalWorkflow(buff.getReader());
83          
84          // now compile the workflow into a set of rules.
85          buff.writeMode();
86          Writer rules = buff.getWriter();
87          transformers.getCompiler().transform(new StreamSource(new StringReader(originalWorkflowDoc)),new StreamResult(rules));
88                 
89          //maybe a bit redundant - should be able to add in xml directly - but this is a good sanity check - no point going any further if
90          // we can't deserialize the generated rules as an interpreter.
91          buff.readMode();
92          GroovyInterpreter interp = interpFactory.newInterpreter(buff.getContents(),new JesInterface(annotatedJob,disp,this));
93          interpFactory.pickleTo(interp,annotatedJob);
94  
95         return annotatedJob;
96      }
97      /*** unpickle the interpreter from the workflow, then try to run any triggerable rules */
98      protected void scheduleSteps(Workflow job) {
99      GroovyInterpreter interp = null;
100     try {
101         interp = interpFactory.unpickleFrom(new JesInterface(job,disp,this));
102     } catch (PickleException e) {
103         logger.error("Can't create workflow interpreter",e);
104         recordFatalError(job,e);
105         return;
106     }
107     try {
108         interp.run();
109     } catch (ScriptEngineException e) {
110         logger.warn("Run threw exception",e);
111         recordFatalError(job,e);
112         return;
113     } 
114     try { 
115         interpFactory.pickleTo(interp,job);
116     } catch (PickleException e) {
117         logger.error("Can't pickle interpreter");
118         recordFatalError(job,e);
119     }    
120     } // end of scheduleSteps()
121 
122     /***
123      * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateJobStatus(org.astrogrid.workflow.beans.v1.Workflow)
124      */
125     protected void updateJobStatus(Workflow job) {
126         // nowt - done internally.
127     }
128 
129     /*** 
130      * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#getStepForFragment(org.astrogrid.workflow.beans.v1.Workflow, java.lang.String)
131      */
132     protected Step getStepForFragment(Workflow job, String fragment) {        
133                return (Step)job.findXPathValue("//*[id='" + fragment + "']");
134     }
135 
136 
137     /***
138      * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#updateStepStatus(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase)
139      *  do what parent, does, and then update corresponding state in interpreter too.
140      *     quite inefficient that we need to recreate interpreter for this. oh well, at least no script is called.
141      */
142     protected void updateStepStatus(Workflow wf, Step step, ExecutionPhase status) {
143         super.updateStepStatus(wf, step, status);
144         try {
145         GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
146         interp.updateStepStatus(step,status);
147         interpFactory.pickleTo(interp,wf);
148         } catch (PickleException e) {
149             logger.error("Failed to update step status",e);
150             recordFatalError(wf,e);
151         }
152     }
153     /*** store results of a cea computation back into workfllow interpreter.
154      * @see org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl#storeResults(org.astrogrid.workflow.beans.v1.Workflow, org.astrogrid.workflow.beans.v1.Step, org.astrogrid.applications.beans.v1.cea.castor.ResultListType)
155      */
156     protected void storeResults(Workflow wf, Step step, ResultListType results) {
157         // first check that they need to be stored
158         if (step.getResultVar() == null) { // user hasn't specified a var to store them in, so not needed.
159             return;
160         }
161         try {
162             GroovyInterpreter interp = interpFactory.unpickleFrom(new JesInterface(wf,disp,this));
163             interp.storeResults(step,results);
164             interpFactory.pickleTo(interp,wf);
165         } catch (PickleException e) {
166             logger.error("Failed to store results",e);
167             recordFatalError(wf,e);
168         }
169     }
170 //---------------------
171   /***
172    * @see org.astrogrid.component.descriptor.ComponentDescriptor#getName()
173    */
174   public String getName() {
175       return "scripted-scheduler";
176   }
177 
178   /***
179    * @see org.astrogrid.component.descriptor.ComponentDescriptor#getDescription()
180    */
181   public String getDescription() {
182       return "Scheduler Implementation that uses a scripting engine";
183   }
184 
185   /***
186    * @see org.astrogrid.component.descriptor.ComponentDescriptor#getInstallationTest()
187    */
188   public Test getInstallationTest() {
189       return null;
190   }
191 
192 
193 
194 
195 }
196 
197 
198 /* 
199 $Log: GroovySchedulerImpl.java,v $
200 Revision 1.5  2004/11/05 16:52:42  jdt
201 Merges from branch nww-itn07-scratchspace
202 
203 Revision 1.4.26.1  2004/11/05 16:15:30  nw
204 optimized by uising temporary buffers.
205 
206 Revision 1.4  2004/09/06 16:47:04  nw
207 javadoc
208 
209 Revision 1.3  2004/08/04 16:51:46  nw
210 added parameter propagation out of cea step call.
211 
212 Revision 1.2  2004/07/30 15:42:34  nw
213 merged in branch nww-itn06-bz#441 (groovy scripting)
214 
215 Revision 1.1.2.4  2004/07/30 15:10:04  nw
216 removed policy-based implementation,
217 adjusted tests, etc to use groovy implementation
218 
219 Revision 1.1.2.3  2004/07/28 16:24:23  nw
220 finished groovy beans.
221 moved useful tests from old python package.
222 removed python implemntation
223 
224 Revision 1.1.2.2  2004/07/27 23:37:59  nw
225 refactoed framework.
226 experimented with betwixt - can't get it to work.
227 got XStream working in 5 mins.
228 about to remove betwixt code.
229 
230 Revision 1.1.2.1  2004/07/26 15:51:19  nw
231 first stab at a groovy scheduler.
232 transcribed all the classes in the python prototype, and took copies of the
233 classes in the 'scripting' package.
234  
235 */