1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.impl;
12
13 import org.astrogrid.applications.beans.v1.cea.castor.MessageType;
14 import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15 import org.astrogrid.applications.beans.v1.cea.castor.types.LogLevel;
16 import org.astrogrid.common.bean.Axis2Castor;
17 import org.astrogrid.jes.JesException;
18 import org.astrogrid.jes.beans.v1.axis.executionrecord.JobURN;
19 import org.astrogrid.jes.job.JobFactory;
20 import org.astrogrid.jes.job.NotFoundException;
21 import org.astrogrid.jes.jobscheduler.JobScheduler;
22 import org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType;
23 import org.astrogrid.jes.types.v1.cea.axis.ResultListType;
24 import org.astrogrid.jes.util.JesUtil;
25 import org.astrogrid.workflow.beans.v1.Step;
26 import org.astrogrid.workflow.beans.v1.Workflow;
27 import org.astrogrid.workflow.beans.v1.execution.StepExecutionRecord;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.exolab.castor.xml.CastorException;
32
33 import java.io.PrintWriter;
34 import java.io.StringWriter;
35 import java.util.Date;
36
37 /***Abstract implementation of a job scheduler. used as as a base class for the groovy job scheduler, and maybe future others.
38 * @author Noel Winstanley nw@jb.man.ac.uk 10-May-2004
39 *
40 */
41 public abstract class AbstractJobSchedulerImpl implements JobScheduler {
42 /*** Construct a new AbstractJobSchedulerImpl
43 *
44 */
45 public AbstractJobSchedulerImpl(JobFactory factory) {
46 assert factory != null;
47 this.factory = factory;
48 }
49
50 /*** facade to a job factory */
51 protected final JobFactory factory;
52 protected static final Log logger = LogFactory.getLog( JobScheduler.class );
53
54
55 /*** register a new job with the scheduler
56 * <p>
57 * initializes execution records for the job, then starts the job running.
58 * @see org.astrogrid.jes.jobscheduler.JobScheduler#scheduleNewJob(org.astrogrid.jes.types.v1.JobURN)
59 */
60 public final void scheduleNewJob( JobURN jobURN ) {
61 logger.debug("Scheduling new Job: " + jobURN.toString());
62 try {
63 Workflow job;
64 try {
65 job = factory.findJob( Axis2Castor.convert(jobURN)) ;
66 } catch (NotFoundException e) {
67 logger.error("Could not find job for urn:" + jobURN.getValue());
68 return;
69 }
70 try {
71 job = initializeJob(job);
72 logger.debug("initialized job " + job.getJobExecutionRecord().getJobId().getContent());
73 } catch (Exception e) {
74 logger.error("Could not initialize job for urn:" + jobURN.getValue(),e);
75 return;
76 }
77 job.getJobExecutionRecord().setStartTime(new Date());
78 job.getJobExecutionRecord().setStatus(ExecutionPhase.INITIALIZING);
79 factory.updateJob(job);
80
81
82 scheduleSteps( job ) ;
83
84 updateJobStatus(job);
85 factory.updateJob( job ) ;
86 } catch (JesException e) {
87 logger.error(e);
88 }
89
90
91 }
92
93 /*** initialize the job / workflow document, ready for execution
94 * @see #scheduleNewJob(JobURN)*/
95 protected abstract Workflow initializeJob(Workflow job) throws Exception;
96
97 /*** resume executioin of a job
98 * <p>
99 * records information returned by tool execution in the workflow document, and then attempts to execute further steps in the workflow.
100 * @see org.astrogrid.jes.jobscheduler.JobScheduler#resumeJob(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType)
101 */
102 public final void resumeJob(JobIdentifierType id,org.astrogrid.jes.types.v1.cea.axis.MessageType info) {
103 logger.debug("Resuming executioin of " + id.toString());
104 Workflow job = null;
105 try {
106 org.astrogrid.workflow.beans.v1.execution.JobURN urn = null;
107 try {
108 urn = JesUtil.extractURN(id);
109 job = factory.findJob(urn ) ;
110 } catch (NotFoundException e) {
111 logger.error("Could not find job for urn" + urn.getContent());
112 return;
113 }
114
115 String fragment = JesUtil.extractXPath(id);
116 Step jobStep = getStepForFragment(job, fragment);
117 if (jobStep == null) {
118 logger.error("Culd not find step " + fragment + " for urn " + urn.getContent());
119 return;
120 }
121
122 StepExecutionRecord er =JesUtil.getLatestOrNewRecord(jobStep);
123 er.addMessage(Axis2Castor.convert(info));
124 ExecutionPhase status = Axis2Castor.convert(info.getPhase());
125
126 updateStepStatus(job,jobStep, status);
127 factory.updateJob(job);
128
129
130 scheduleSteps(job);
131 updateJobStatus(job);
132 factory.updateJob( job ) ;
133 } catch (JesException e) {
134
135 logger.fatal("System error",e);
136
137 if (job != null) {
138 recordFatalError(job, e);
139 try {
140 factory.updateJob(job);
141 } catch (JesException jex) {
142 logger.fatal("can't save error report into workflow",jex);
143 }
144 }
145 }
146
147 }
148
149 /*** record a thrown exception in the workflow document,
150 * and set status of entrire workflow to error
151 * @param job
152 * @param e
153 * @todo want to able to send a debuf-level message here too
154 */
155 protected void recordFatalError(Workflow job, JesException e) {
156 MessageType errorMessage = new MessageType();
157 errorMessage.setPhase(ExecutionPhase.ERROR);
158 errorMessage.setLevel(LogLevel.ERROR);
159 errorMessage.setSource("Jes System");
160 errorMessage.setTimestamp(new Date());
161 errorMessage.setContent(JesUtil.getMessageChain(e));
162
163 MessageType debugMessage = new MessageType();
164 debugMessage.setPhase(errorMessage.getPhase());
165 debugMessage.setLevel(LogLevel.INFO);
166 debugMessage.setSource(errorMessage.getSource());
167 debugMessage.setTimestamp(errorMessage.getTimestamp());
168 StringWriter content = new StringWriter();
169 PrintWriter pw = new PrintWriter(content);
170 pw.println("System Error " + e.getClass().getName());
171 e.printStackTrace(pw);
172 debugMessage.setContent(content.toString());
173
174 job.getJobExecutionRecord().addMessage(errorMessage);
175 job.getJobExecutionRecord().addMessage(debugMessage);
176 job.getJobExecutionRecord().setStatus(ExecutionPhase.ERROR);
177 }
178
179 /*** @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType) */
180 protected void updateStepStatus(Workflow wf,Step step, ExecutionPhase status) {
181
182 StepExecutionRecord er = JesUtil.getLatestOrNewRecord(step);
183 if (status.getType() > er.getStatus().getType()) {
184 er.setStatus(status);
185 if (status.getType() >= ExecutionPhase.COMPLETED_TYPE) {
186 er.setFinishTime(new Date());
187 }
188 }
189 }
190 /***
191 * update the status of the entire job - maybe by rescanning tree in someway.
192 * @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType)
193 */
194 protected abstract void updateJobStatus(Workflow job);
195
196
197 /*** project the step correspondinig to this fragment from the workflow
198 * @return the step, or null if not found.
199 * @see #resumeJob(JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.MessageType) where this method is used.
200 * @see #reportResults(JobIdentifierType, ResultListType) where this method is used.*/
201 protected abstract Step getStepForFragment(Workflow job, String fragment);
202
203 /***
204 * select steps that can be executed, and fire them off.
205 * used within {@link #resumeJob}, {@link #reportResults}, {@link scheduleNewJob}
206 */
207 protected abstract void scheduleSteps(Workflow job) ;
208
209
210
211 /*** helper method to create a new execution record, pre-populated */
212 public static StepExecutionRecord newStepExecutionRecord() {
213 StepExecutionRecord result = new StepExecutionRecord();
214 result.setStartTime(new Date());
215 result.setStatus(ExecutionPhase.INITIALIZING);
216 return result;
217 }
218
219
220
221 /*** Record results of a job step execution
222 * @see org.astrogrid.jes.jobscheduler.JobScheduler#reportResults(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.ResultListType)
223 */
224 public final void reportResults(JobIdentifierType id, ResultListType results) throws Exception {
225 logger.debug("reporting results of " + id.toString());
226 Workflow job = null;
227 try {
228 org.astrogrid.workflow.beans.v1.execution.JobURN urn = null;
229 try {
230 urn = JesUtil.extractURN(id);
231 job = factory.findJob(urn ) ;
232 } catch (NotFoundException e) {
233 logger.error("Could not find job for urn" + urn.getContent());
234 return;
235 }
236 String xpath = JesUtil.extractXPath(id);
237 Step jobStep = this.getStepForFragment(job,xpath);
238 if (jobStep == null) {
239 logger.error("Culd not find step " + xpath + " for urn " + urn.getContent());
240 return;
241 }
242
243
244 StepExecutionRecord er =JesUtil.getLatestOrNewRecord(jobStep);
245 MessageType resultsMessage = buildResultsMessage(results);
246 er.addMessage(resultsMessage);
247
248
249 org.astrogrid.applications.beans.v1.cea.castor.ResultListType castorResults = Axis2Castor.convert(results);
250 storeResults(job,jobStep,castorResults);
251 factory.updateJob(job);
252
253
254
255 scheduleSteps(job);
256 updateJobStatus(job);
257 factory.updateJob( job ) ;
258 } catch (JesException e) {
259
260 logger.fatal("report results - jes exception",e);
261 }
262 }
263
264 /*** store / do something with the results of a step
265 *
266 * @param wf
267 * @param step
268 * @param results
269 * @see #reportResults(JobIdentifierType, ResultListType)
270 */
271 protected abstract void storeResults(Workflow wf,Step step,org.astrogrid.applications.beans.v1.cea.castor.ResultListType results);
272
273 /*** build an execution message from the resultslist
274 * @param results
275 * @return
276 */
277 private MessageType buildResultsMessage(ResultListType results) {
278 MessageType resultsMessage = new MessageType();
279 resultsMessage.setLevel(LogLevel.INFO);
280 resultsMessage.setPhase(ExecutionPhase.COMPLETED);
281 resultsMessage.setSource("CEA");
282 resultsMessage.setTimestamp(new Date());
283 StringWriter content = new StringWriter();
284 try {
285 Axis2Castor.convert(results).marshal(content);
286 } catch (CastorException e) {
287 e.printStackTrace(new PrintWriter(content));
288 }
289
290 resultsMessage.setContent(content.toString());
291 return resultsMessage;
292 }
293
294 /***called when a workfllow completes.
295 * do nothing-implementaiton - may be overridden.
296 * */
297 public void notifyJobFinished(Workflow job) {
298 }
299
300 /***
301 * @see org.astrogrid.jes.jobscheduler.JobScheduler#abortJob(org.astrogrid.jes.types.v1.JobURN)
302 */
303 public final void abortJob(JobURN jobURN) {
304 logger.debug("Aborting job: " + jobURN.toString());
305 try {
306 Workflow job = factory.findJob(Axis2Castor.convert(jobURN));
307
308 ExecutionPhase currentPhase = job.getJobExecutionRecord().getStatus();
309 if (currentPhase.getType() < ExecutionPhase.COMPLETED_TYPE) {
310 logger.debug("marking job as in error");
311 job.getJobExecutionRecord().setStatus(ExecutionPhase.ERROR);
312 MessageType msg = new MessageType();
313 msg.setContent("Aborted by user");
314 msg.setSource("JES");
315 msg.setTimestamp(new Date());
316 msg.setPhase(currentPhase);
317 msg.setLevel(LogLevel.INFO);
318 job.getJobExecutionRecord().addMessage(msg);
319 factory.updateJob(job);
320 } else {
321 logger.debug("job has already finished");
322 }
323 } catch (NotFoundException e) {
324 logger.warn("Attempted to abort job that doesn't exist:" + jobURN.getValue());
325 } catch (JesException e) {
326 logger.error("AbortJob",e);
327 }
328
329 }
330 /***
331 * @see org.astrogrid.jes.jobscheduler.JobScheduler#deleteJob(org.astrogrid.jes.types.v1.JobURN)
332 */
333 public final void deleteJob(JobURN jobURN) {
334 logger.debug("Deleting job" + jobURN.toString());
335 try {
336 Workflow job = factory.findJob(Axis2Castor.convert(jobURN));
337 factory.deleteJob(job);
338 } catch (NotFoundException e) {
339 logger.warn("Attempted to delete job that doesn't exist: " + jobURN.getValue());
340 } catch (JesException e) {
341 logger.error("DeleteJob",e);
342 }
343 }
344
345
346
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392