View Javadoc

1   /*$Id: JesInterface.java,v 1.14 2004/12/03 14:47:41 jdt Exp $
2    * Created on 12-May-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid 
7    * Software License version 1.2, a copy of which has been included 
8    * with this distribution in the LICENSE.txt file.  
9    *
10  **/
11  package org.astrogrid.jes.jobscheduler.impl.groovy;
12  
13  import org.astrogrid.applications.beans.v1.cea.castor.MessageType;
14  import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15  import org.astrogrid.applications.beans.v1.cea.castor.types.LogLevel;
16  import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
17  import org.astrogrid.jes.jobscheduler.Dispatcher;
18  import org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl;
19  import org.astrogrid.jes.util.JesUtil;
20  import org.astrogrid.jes.util.TemporaryBuffer;
21  import org.astrogrid.workflow.beans.v1.Script;
22  import org.astrogrid.workflow.beans.v1.Set;
23  import org.astrogrid.workflow.beans.v1.Step;
24  import org.astrogrid.workflow.beans.v1.Tool;
25  import org.astrogrid.workflow.beans.v1.Workflow;
26  import org.astrogrid.workflow.beans.v1.execution.JobExecutionRecord;
27  import org.astrogrid.workflow.beans.v1.execution.StepExecutionRecord;
28  
29  import org.apache.commons.collections.CollectionUtils;
30  
31  import java.io.PrintStream;
32  import java.io.UnsupportedEncodingException;
33  import java.lang.ref.SoftReference;
34  import java.util.ArrayList;
35  import java.util.Date;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  
40  
41  /*** provides an interface for executing groovy scripts in rulebase to call back into jes server 
42   * also provides some 'helper' methods that can be called from scripts to create usefule objects, etc. Maybe these should be split into a libraru class, and a JesServerINterface class.
43   * @modified - added temporary buffers (under soft reference) for capturing system err and out. 
44   * @script-summary access the jes server
45   * @script-doc this object allows access to some of the internals of the jes server the workflow script is executing on
46   */
47  public class  JesInterface extends WorkflowLogger {
48      private static final String ENCODING = "UTF-16";
49      /*** construct a new interpreter environment
50       * @param wf workflow object
51       * @param disp ToolDispatcher
52       */
53      public JesInterface(Workflow wf, Dispatcher disp,GroovySchedulerImpl sched){
54          super(wf);
55          this.disp = disp;
56          this.sched = sched;
57      }
58      
59      protected final GroovySchedulerImpl sched;
60      protected final Dispatcher disp;
61      private SoftReference softErrBuff = new SoftReference(new TemporaryBuffer());
62      private SoftReference softOutBuff = new SoftReference(new TemporaryBuffer());
63      
64      /*** access the version info for this jes server.
65       * <p>
66       * at present returns a list of bugzilla numbers this server implements
67       * @todo implement to return richer info
68       * @script-doc inspect version information for this server*/
69      public String getVersion() {
70          return "Iteration 07,  jes-nww-714  workflow-nww-776";
71      }
72      
73      /*** the workflow object the script belongs to 
74       * @script-doc access the object model of the current workflow
75       * */
76      public Workflow getWorkflow() {
77          return wf;
78      }
79      
80      /*** return list of steps in the workflow document 
81       * @script-doc access the list of steps in the current workflow*/
82      public List getSteps() {
83              Iterator i = JesUtil.getJobSteps(wf);
84              List l = new ArrayList();
85              CollectionUtils.addAll(l,i);
86               return l;
87          }
88      
89      /*** create a new initialized parameter object 
90       * @script-doc create a new tool-parameter object, initialized, but unattached to any tool*/
91      public ParameterValue newParameter() {
92          ParameterValue pval = new  ParameterValue();
93          pval.setIndirect(false);
94          pval.setEncoding("");
95          return pval;
96      }
97      
98          /*** the tool step dispatcher
99       * @script-doc-omit don't think it's sensible to encourage use of this.*/
100     public Dispatcher getDispatcher() {
101         return disp;
102     }
103     // plus some helper methods.
104 
105     /*** access object in workflow tree by id 
106      *@script-doc access an element in the workflow document by it's unique id.
107      * */
108     public Object getId(String id) {        
109         if (id.equals(getWorkflow().getId())) {
110             return getWorkflow();
111         } 
112         Object result =  getWorkflow().findXPathValue("//*[id = '" + id + "']");
113         if (result != null) {
114             return result;
115         } else {
116             int lastIndex = id.lastIndexOf('-');
117             if (lastIndex != -1) {
118             // assume its a mangled id. try again, removing last portion.
119                 return getId(id.substring(0,lastIndex));
120             } else {
121                 return null;
122             }
123         }
124     }
125     /*** 
126      * dispatach a tool step to a cea server for execution. 
127      * @param id - the identifier of the step to execute
128      * @script-doc-omit very internal
129      * */ 
130     public boolean dispatchStep(String id, JesShell shell, ActivityStatusStore states, Map rules)  {
131         Step step = (Step)getId(id);
132           StepExecutionRecord er = AbstractJobSchedulerImpl.newStepExecutionRecord();
133           step.addStepExecutionRecord(er);
134            try{
135                Tool tool = shell.evaluateTool(step.getTool(),id,states,rules);
136                getDispatcher().dispatchStep(getWorkflow(),tool,id); 
137                er.setStatus(ExecutionPhase.RUNNING);
138                return true;
139            } catch (Throwable t) { // absolutely anything
140                error("Failed to dispatch step",t,er);
141                er.setStatus(ExecutionPhase.ERROR);                      
142                debug("Failed to dispatch step",t,er);                 
143                 return false;
144            }                             
145     }
146     
147     /*** execute a set activity 
148      * @script-doc-omit very internal*/
149     public boolean executeSet(String id,JesShell shell,ActivityStatusStore map,Map rules)  {
150         Set set = (Set)getId(id);
151         try {
152             shell.executeSet(set,id,map,rules);
153             return true;
154         } catch (Throwable t) {
155             error("set " + set.getVar() + ":= " + set.getValue() + " failed",t);
156             return false;
157         }
158     }    
159     /*** dispatch / execute a script activity
160      * @script-doc-omit you must be joking.. */
161     public boolean dispatchScript(String id,JesShell shell,ActivityStatusStore map,Map rules) {
162         Script script = (Script)getId(id);        
163         StepExecutionRecord er = AbstractJobSchedulerImpl.newStepExecutionRecord();
164         script.addStepExecutionRecord(er);
165         er.setStatus(ExecutionPhase.RUNNING);
166 
167         TemporaryBuffer errB = (TemporaryBuffer)softErrBuff.get();
168         if (errB == null) {
169             errB = new TemporaryBuffer();
170             softErrBuff = new SoftReference(errB);
171         }
172         
173         TemporaryBuffer outB = (TemporaryBuffer)softOutBuff.get();
174         if (outB == null) {
175             outB = new TemporaryBuffer();
176             softOutBuff = new SoftReference(outB);
177         }
178         
179         errB.writeMode();
180         outB.writeMode();
181         PrintStream errStream = null;
182         PrintStream outStream = null;
183         try {
184             errStream = new PrintStream(errB.getOutputStream(),false,ENCODING);
185             outStream = new PrintStream(outB.getOutputStream(),false,ENCODING);            
186         } catch (UnsupportedEncodingException e) {
187             logger.fatal("JVM doesn't support UTF-16 - which is required by specs",e);
188             throw new RuntimeException("JVM doesn't support UTF-16 - which is required by specs",e);
189         }
190 
191         try { 
192             shell.executeScript(script.getBody(),id,map,rules,errStream,outStream);
193             er.setStatus(ExecutionPhase.COMPLETED);
194             return true;
195         } catch (Throwable t) {
196             error("Failed to execute script",t,er);
197             er.setStatus(ExecutionPhase.ERROR);                   
198             debug("Failed to executte script",t,er);
199             return false;
200         } finally { // want to record results, no matter what happened.
201             errStream.close();
202             outStream.close();            
203             errB.readMode();
204             outB.readMode();
205            MessageType message =buildMessage(errB.getContents(ENCODING));
206            message.setLevel(LogLevel.INFO);
207            message.setSource("stderr");
208            er.addMessage(message);
209            
210            message = buildMessage(outB.getContents(ENCODING));
211            message.setLevel(LogLevel.INFO);
212            message.setSource("stdout");
213            er.addMessage(message);
214           er.setFinishTime(new Date());
215         }
216     }
217 
218     
219     /*** records most important events back into main workflow document.
220      * @script-doc-omit alter status of the entire workflow document. Can be used to halt / abort execution. */
221     public void setWorkflowStatus(Status status) {
222                 ExecutionPhase phase = Status.toPhase(status);
223                 JobExecutionRecord er =  wf.getJobExecutionRecord();
224                 er.setStatus(phase);
225                 if (phase.equals(ExecutionPhase.COMPLETED) || phase.equals(ExecutionPhase.ERROR)){
226                     wf.getJobExecutionRecord().setFinishTime(new Date());
227                     logger.info("Job " + er.getJobId().getContent() + " finished with status " + er.getStatus());
228                     sched.notifyJobFinished(wf);
229                 }           
230     }
231     
232 
233 }
234 
235 /* 
236 $Log: JesInterface.java,v $
237 Revision 1.14  2004/12/03 14:47:41  jdt
238 Merges from workflow-nww-776
239 
240 Revision 1.13.2.1  2004/12/01 21:49:47  nw
241 script-documentation
242 amended getVersion()
243 
244 Revision 1.13  2004/11/29 20:00:24  clq2
245 jes-nww-714
246 
247 Revision 1.12.12.1  2004/11/25 23:34:34  nw
248 improved error messages reported from jes
249 
250 Revision 1.12  2004/11/05 16:52:42  jdt
251 Merges from branch nww-itn07-scratchspace
252 
253 Revision 1.11.18.1  2004/11/05 16:15:04  nw
254 uses temporary buffers,
255 updated to new rulestore type.
256 
257 Revision 1.11  2004/09/16 21:47:29  nw
258 made sure all streams are closed
259 
260 Revision 1.10  2004/09/06 16:47:04  nw
261 javadoc
262 
263 Revision 1.9  2004/08/18 21:50:15  nw
264 improved error propagation and reporting.
265 messages are now logged to workflow document
266 
267 Revision 1.8  2004/08/13 09:10:30  nw
268 tidied imports
269 
270 Revision 1.7  2004/08/09 17:34:10  nw
271 implemented parfor.
272 removed references to rulestore
273 
274 Revision 1.6  2004/08/06 11:59:12  nw
275 added helper methods for scripts manipulating the workflow document itself.
276 
277 Revision 1.5  2004/08/04 16:51:46  nw
278 added parameter propagation out of cea step call.
279 
280 Revision 1.4  2004/08/03 16:32:26  nw
281 remove unnecessary envId attrib from rules
282 implemented variable propagation into parameter values.
283 
284 Revision 1.3  2004/08/03 14:27:38  nw
285 added set/unset/scope features.
286 
287 Revision 1.2  2004/07/30 15:42:34  nw
288 merged in branch nww-itn06-bz#441 (groovy scripting)
289 
290 Revision 1.1.2.4  2004/07/30 15:10:04  nw
291 removed policy-based implementation,
292 adjusted tests, etc to use groovy implementation
293 
294 Revision 1.1.2.3  2004/07/30 14:00:10  nw
295 first working draft
296 
297 Revision 1.1.2.2  2004/07/28 16:24:23  nw
298 finished groovy beans.
299 moved useful tests from old python package.
300 removed python implemntation
301 
302 Revision 1.1.2.1  2004/07/27 23:37:59  nw
303 refactoed framework.
304 experimented with betwixt - can't get it to work.
305 got XStream working in 5 mins.
306 about to remove betwixt code.
307 
308 Revision 1.1.2.1  2004/07/26 15:51:19  nw
309 first stab at a groovy scheduler.
310 transcribed all the classes in the python prototype, and took copies of the
311 classes in the 'scripting' package.
312 
313 Revision 1.1  2004/07/09 09:30:28  nw
314 merged in scripting workflow interpreter from branch
315 nww-x-workflow-extensions
316 
317 Revision 1.1.2.1  2004/05/21 11:25:19  nw
318 first checkin of prototype scrpting workflow interpreter
319  
320 */