View Javadoc

1   /*$Id: AbstractJobSchedulerImpl.java,v 1.8 2004/12/03 14:47:41 jdt Exp $
2    * Created on 10-May-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid 
7    * Software License version 1.2, a copy of which has been included 
8    * with this distribution in the LICENSE.txt file.  
9    *
10  **/
11  package org.astrogrid.jes.jobscheduler.impl;
12  
13  import org.astrogrid.applications.beans.v1.cea.castor.MessageType;
14  import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15  import org.astrogrid.applications.beans.v1.cea.castor.types.LogLevel;
16  import org.astrogrid.common.bean.Axis2Castor;
17  import org.astrogrid.jes.JesException;
18  import org.astrogrid.jes.beans.v1.axis.executionrecord.JobURN;
19  import org.astrogrid.jes.job.JobFactory;
20  import org.astrogrid.jes.job.NotFoundException;
21  import org.astrogrid.jes.jobscheduler.JobScheduler;
22  import org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType;
23  import org.astrogrid.jes.types.v1.cea.axis.ResultListType;
24  import org.astrogrid.jes.util.JesUtil;
25  import org.astrogrid.workflow.beans.v1.Step;
26  import org.astrogrid.workflow.beans.v1.Workflow;
27  import org.astrogrid.workflow.beans.v1.execution.StepExecutionRecord;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.exolab.castor.xml.CastorException;
32  
33  import java.io.PrintWriter;
34  import java.io.StringWriter;
35  import java.util.Date;
36  
37  /***Abstract implementation of a job scheduler. used as as a base class for the groovy job scheduler, and maybe future others.
38   * @author Noel Winstanley nw@jb.man.ac.uk 10-May-2004
39   *
40   */
41  public abstract class AbstractJobSchedulerImpl implements JobScheduler {
42      /*** Construct a new AbstractJobSchedulerImpl
43       * 
44       */
45      public AbstractJobSchedulerImpl(JobFactory factory) {
46          assert factory != null;        
47          this.factory = factory;
48      }
49  
50      /*** facade to a job factory */
51      protected final JobFactory factory;
52      protected static final Log logger = LogFactory.getLog( JobScheduler.class );
53  
54  
55      /*** register a new job with the scheduler
56       * <p>
57       * initializes execution records for the job, then starts the job running.
58       * @see org.astrogrid.jes.jobscheduler.JobScheduler#scheduleNewJob(org.astrogrid.jes.types.v1.JobURN)
59       */
60      public final void scheduleNewJob( JobURN jobURN ) {
61          logger.debug("Scheduling new Job: " + jobURN.toString());
62          try {
63              Workflow job;
64              try {
65                  job = factory.findJob( Axis2Castor.convert(jobURN)) ;
66              } catch (NotFoundException e) {
67                  logger.error("Could not find job for urn:" + jobURN.getValue());
68                  return;
69              } 
70              try {
71                  job = initializeJob(job);   
72                  logger.debug("initialized job " + job.getJobExecutionRecord().getJobId().getContent());
73              } catch (Exception e) {
74                  logger.error("Could not initialize job for urn:" + jobURN.getValue(),e);
75                  return;
76              }
77              job.getJobExecutionRecord().setStartTime(new Date());
78              job.getJobExecutionRecord().setStatus(ExecutionPhase.INITIALIZING);
79              factory.updateJob(job);
80                                    
81              // Schedule one or more job steps....
82              scheduleSteps( job ) ;
83           
84              updateJobStatus(job);             
85              factory.updateJob( job ) ;              // Update any changed details to the database                                                
86          } catch (JesException e) {
87              logger.error(e);
88          }
89  
90               
91      }
92  
93      /*** initialize the job / workflow document, ready for execution 
94       * @see #scheduleNewJob(JobURN)*/
95      protected abstract Workflow initializeJob(Workflow job) throws Exception;
96  
97      /*** resume executioin of a job
98       * <p>
99       * records information returned by tool execution in the workflow document, and then attempts to execute further steps in the workflow.
100      * @see org.astrogrid.jes.jobscheduler.JobScheduler#resumeJob(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType)
101      */
102     public final void resumeJob(JobIdentifierType id,org.astrogrid.jes.types.v1.cea.axis.MessageType info) {
103         logger.debug("Resuming executioin of " + id.toString());
104         Workflow job = null;  
105         try {
106             org.astrogrid.workflow.beans.v1.execution.JobURN urn = null;          
107              try {
108                   urn = JesUtil.extractURN(id);
109                 job = factory.findJob(urn ) ;
110              } catch (NotFoundException e) {
111                  logger.error("Could not find job for urn" + urn.getContent());
112                  return;
113              }        
114                   
115              String fragment = JesUtil.extractXPath(id);
116              Step jobStep = getStepForFragment(job, fragment);
117              if (jobStep == null) {
118                  logger.error("Culd not find step " + fragment + " for urn " + urn.getContent());
119                  return;
120              }              
121              //add message into execution record for step.      
122              StepExecutionRecord er =JesUtil.getLatestOrNewRecord(jobStep);             
123              er.addMessage(Axis2Castor.convert(info));
124              ExecutionPhase status = Axis2Castor.convert(info.getPhase());
125              // hook into updating status of job.
126             updateStepStatus(job,jobStep, status);
127               factory.updateJob(job);
128                             
129              // now go try run some more steps.
130              scheduleSteps(job);  
131              updateJobStatus(job);
132              factory.updateJob( job ) ;             // Update any changed details to the database
133         } catch (JesException e) {
134             // basically, somethings gone wrong with the job store, rather than with steps. its a fatal.
135             logger.fatal("System error",e);
136             // could try saving the error in the job itself.
137             if (job != null) {
138                 recordFatalError(job, e);
139                 try {
140                     factory.updateJob(job);
141                 } catch (JesException jex) {
142                     logger.fatal("can't save error report into workflow",jex);
143                 }
144             } 
145         }
146 
147     }
148 
149     /*** record a thrown exception in the workflow document,
150      * and set status of entrire workflow to error
151      * @param job
152      * @param e
153      * @todo want to able to send a debuf-level message here too
154      */
155     protected void recordFatalError(Workflow job, JesException e) {
156         MessageType errorMessage = new MessageType();
157         errorMessage.setPhase(ExecutionPhase.ERROR);
158         errorMessage.setLevel(LogLevel.ERROR);
159         errorMessage.setSource("Jes System");
160         errorMessage.setTimestamp(new Date());
161         errorMessage.setContent(JesUtil.getMessageChain(e));
162         
163         MessageType debugMessage = new MessageType();
164         debugMessage.setPhase(errorMessage.getPhase());
165         debugMessage.setLevel(LogLevel.INFO);
166         debugMessage.setSource(errorMessage.getSource());
167         debugMessage.setTimestamp(errorMessage.getTimestamp());
168         StringWriter content = new StringWriter();
169         PrintWriter pw = new PrintWriter(content);
170         pw.println("System Error " + e.getClass().getName());
171         e.printStackTrace(pw);
172         debugMessage.setContent(content.toString());
173         
174         job.getJobExecutionRecord().addMessage(errorMessage);
175         job.getJobExecutionRecord().addMessage(debugMessage);        
176         job.getJobExecutionRecord().setStatus(ExecutionPhase.ERROR);
177     }
178     
179     /*** @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType) */
180     protected void updateStepStatus(Workflow wf,Step step, ExecutionPhase status) {
181            // only update status if executioin record hasn't already passed this status.        
182            StepExecutionRecord er = JesUtil.getLatestOrNewRecord(step); 
183             if (status.getType() > er.getStatus().getType()) {
184                er.setStatus(status);
185                if (status.getType() >= ExecutionPhase.COMPLETED_TYPE) { // finished or error
186                     er.setFinishTime(new Date()); 
187                }
188              }           
189     }   
190     /***
191      * update the status of the entire job - maybe by rescanning tree in someway.
192      * @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType)
193      */
194     protected abstract void updateJobStatus(Workflow job);
195     
196 
197     /*** project the step correspondinig to this fragment from the workflow 
198      * @return the step, or null if not found.
199      * @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType) where this method is used.
200      * @see #reportResults(JobIdentifierType, ResultListType) where this method is used.*/
201     protected abstract Step getStepForFragment(Workflow job, String fragment); 
202     
203     /***
204      * select steps that can be executed, and fire them off.
205      * used within {@link #resumeJob}, {@link #reportResults}, {@link scheduleNewJob} 
206      */
207     protected abstract void scheduleSteps(Workflow job) ;
208     
209 
210 
211     /*** helper method to create a new execution record, pre-populated */
212     public static StepExecutionRecord newStepExecutionRecord() {
213         StepExecutionRecord result = new StepExecutionRecord();
214         result.setStartTime(new Date());
215         result.setStatus(ExecutionPhase.INITIALIZING);
216         return result;
217     }
218 
219 
220 
221     /*** Record results of a job step execution
222      * @see org.astrogrid.jes.jobscheduler.JobScheduler#reportResults(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.ResultListType)
223      */
224     public final void reportResults(JobIdentifierType id, ResultListType results) throws Exception {
225         logger.debug("reporting results of " + id.toString());
226         Workflow job = null;
227         try {
228             org.astrogrid.workflow.beans.v1.execution.JobURN urn = null;          
229             try {
230                  urn = JesUtil.extractURN(id);
231                job = factory.findJob(urn ) ;
232             } catch (NotFoundException e) {
233                 logger.error("Could not find job for urn" + urn.getContent());
234                 return;
235             }
236             String xpath = JesUtil.extractXPath(id);
237             Step jobStep = this.getStepForFragment(job,xpath);
238             if (jobStep == null) {
239                 logger.error("Culd not find step " + xpath + " for urn " + urn.getContent());
240                 return;
241             } 
242             // ok, found the job step this results are bound for.
243             // going to do two things now - record results as comment in the executionRecord.
244             StepExecutionRecord er =JesUtil.getLatestOrNewRecord(jobStep);       
245             MessageType resultsMessage = buildResultsMessage(results);
246             er.addMessage(resultsMessage);
247                         
248             // and then call the hook for extension classes to do something with the results too.            
249             org.astrogrid.applications.beans.v1.cea.castor.ResultListType castorResults =  Axis2Castor.convert(results);            
250             storeResults(job,jobStep,castorResults);          
251              factory.updateJob(job); // save our changes.
252              
253              // see if anything else can be run now.
254              // now go try run some more steps.
255              scheduleSteps(job);  
256              updateJobStatus(job);
257              factory.updateJob( job ) ;             // Update any changed details to the database
258         } catch (JesException e) {
259             // somthing badly wrong.
260             logger.fatal("report results - jes exception",e);
261         }
262     }
263     
264     /*** store / do something with the results of a step
265      *  
266      * @param wf
267      * @param step
268      * @param results
269      * @see #reportResults(JobIdentifierType, ResultListType)
270      */
271     protected abstract void storeResults(Workflow wf,Step step,org.astrogrid.applications.beans.v1.cea.castor.ResultListType results);
272 
273     /*** build an execution message from the resultslist
274      * @param results
275      * @return
276      */
277     private MessageType buildResultsMessage(ResultListType results) {
278         MessageType resultsMessage = new MessageType();
279         resultsMessage.setLevel(LogLevel.INFO);
280         resultsMessage.setPhase(ExecutionPhase.COMPLETED);
281         resultsMessage.setSource("CEA");
282         resultsMessage.setTimestamp(new Date());   
283         StringWriter content = new StringWriter(); 
284         try {
285             Axis2Castor.convert(results).marshal(content);
286         } catch (CastorException e) { 
287             e.printStackTrace(new PrintWriter(content));
288         }
289 
290         resultsMessage.setContent(content.toString());
291         return resultsMessage;
292     }
293     
294     /***called when a workfllow completes.
295      *  do nothing-implementaiton - may be overridden.
296      * */
297     public void notifyJobFinished(Workflow job) {   
298     }
299     
300     /***
301          * @see org.astrogrid.jes.jobscheduler.JobScheduler#abortJob(org.astrogrid.jes.types.v1.JobURN)
302          */
303     public final void abortJob(JobURN jobURN) {
304         logger.debug("Aborting job: " + jobURN.toString());
305         try {
306             Workflow job = factory.findJob(Axis2Castor.convert(jobURN));
307             //check current phase
308             ExecutionPhase currentPhase = job.getJobExecutionRecord().getStatus();
309             if (currentPhase.getType() < ExecutionPhase.COMPLETED_TYPE) { // i.e. still running, or not running yet..
310                 logger.debug("marking job as in error");
311                 job.getJobExecutionRecord().setStatus(ExecutionPhase.ERROR);
312                 MessageType msg = new MessageType();
313                 msg.setContent("Aborted by user");
314                 msg.setSource("JES");
315                 msg.setTimestamp(new Date());
316                 msg.setPhase(currentPhase);
317                 msg.setLevel(LogLevel.INFO);
318                 job.getJobExecutionRecord().addMessage(msg);
319                 factory.updateJob(job);
320             } else {
321                 logger.debug("job has already finished");
322             }
323         } catch (NotFoundException e) {
324             logger.warn("Attempted to abort job that doesn't exist:" + jobURN.getValue());
325         } catch (JesException e) {
326            logger.error("AbortJob",e);
327         }
328         
329     }
330     /***
331          * @see org.astrogrid.jes.jobscheduler.JobScheduler#deleteJob(org.astrogrid.jes.types.v1.JobURN)
332          */
333     public final void deleteJob(JobURN jobURN) {
334         logger.debug("Deleting job" + jobURN.toString());
335         try {
336             Workflow job = factory.findJob(Axis2Castor.convert(jobURN));
337             factory.deleteJob(job);
338         } catch (NotFoundException e) {
339             logger.warn("Attempted to delete job that doesn't exist: " + jobURN.getValue());
340         } catch (JesException e) {
341             logger.error("DeleteJob",e);
342         }
343     }
344     
345 
346     
347 }
348 
349 
350 /* 
351 $Log: AbstractJobSchedulerImpl.java,v $
352 Revision 1.8  2004/12/03 14:47:41  jdt
353 Merges from workflow-nww-776
354 
355 Revision 1.7.2.1  2004/12/01 21:48:20  nw
356 adjusted to work with new summary object,
357 and changed package of JobURN
358 
359 Revision 1.7  2004/11/29 20:00:24  clq2
360 jes-nww-714
361 
362 Revision 1.6.68.2  2004/11/25 23:34:34  nw
363 improved error messages reported from jes
364 
365 Revision 1.6.68.1  2004/11/24 18:48:29  nw
366 attempt to get nicer error messages
367 
368 Revision 1.6  2004/08/13 09:07:58  nw
369 tidied imports
370 
371 Revision 1.5  2004/08/04 16:51:46  nw
372 added parameter propagation out of cea step call.
373 
374 Revision 1.4  2004/07/30 15:42:34  nw
375 merged in branch nww-itn06-bz#441 (groovy scripting)
376 
377 Revision 1.3.20.1  2004/07/30 15:10:04  nw
378 removed policy-based implementation,
379 adjusted tests, etc to use groovy implementation
380 
381 Revision 1.3  2004/07/09 15:49:08  nw
382 fixed NPE in processing results.
383 should fix regression on SimpleJavaWorkflowEndToEndTest
384 
385 Revision 1.2  2004/07/09 09:30:28  nw
386 merged in scripting workflow interpreter from branch
387 nww-x-workflow-extensions
388 
389 Revision 1.1.2.1  2004/05/21 11:25:19  nw
390 first checkin of prototype scrpting workflow interpreter
391  
392 */