1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobcontroller;
12
13 import org.astrogrid.common.bean.Axis2Castor;
14 import org.astrogrid.common.bean.Castor2Axis;
15 import org.astrogrid.community.beans.v1.axis._Account;
16 import org.astrogrid.component.descriptor.ComponentDescriptor;
17 import org.astrogrid.jes.JesException;
18 import org.astrogrid.jes.beans.v1.axis.executionrecord.JobURN;
19 import org.astrogrid.jes.beans.v1.axis.executionrecord.WorkflowString;
20 import org.astrogrid.jes.beans.v1.axis.executionrecord.WorkflowSummaryType;
21 import org.astrogrid.jes.beans.v1.axis.executionrecord._extension;
22 import org.astrogrid.jes.delegate.v1.jobcontroller.JesFault;
23 import org.astrogrid.jes.job.JobException;
24 import org.astrogrid.jes.job.JobFactory;
25 import org.astrogrid.jes.jobscheduler.JobScheduler;
26 import org.astrogrid.jes.util.TemporaryBuffer;
27 import org.astrogrid.workflow.beans.v1.Workflow;
28 import org.astrogrid.workflow.beans.v1.execution.JobExecutionRecord;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.exolab.castor.xml.CastorException;
33
34 import java.io.StringReader;
35 import java.util.ArrayList;
36 import java.util.Calendar;
37 import java.util.Iterator;
38 import java.util.List;
39
40 import junit.framework.Test;
41
42
43
44 /***
45 * Management interface into the job execution system.
46 * <p>
47 * This class allows clients to interact with a job store - listing, removing, retreiving jobs, etc. Adding a job to the store has the effect of
48 * scheduling it for execution too.
49 * <p>
50 * This class could be accessed directly, but would more usually be called from a web delegate via a SOAP call.
51 *
52 * @author Jeff Lusted
53 * @version 1.0 28-May-2003
54 * @since AstroGrid 1.2
55 * Bug#12 Jeff Lusted - 30-June-2003 NullPointerException under error conditions.
56 */
57 public class JobController implements org.astrogrid.jes.delegate.v1.jobcontroller.JobController, ComponentDescriptor{
58
59 public JobController(JobFactory factory,JobScheduler nudger, TemporaryBuffer buff) {
60 assert factory != null;
61 assert nudger != null;
62 this.factory = factory;
63 this.nudger = nudger;
64 this.buff = buff;
65 }
66 protected final JobFactory factory;
67 protected final JobScheduler nudger;
68 protected final TemporaryBuffer buff;
69
70 private static final Log
71 logger = LogFactory.getLog( JobController.class ) ;
72
73 /*** Submit a job to the execution service
74 *@todo rewrite to use buffer?
75 * @see org.astrogrid.jes.delegate.v1.jobcontroller.JobController#submitWorkflow(org.astrogrid.jes.types.v1.WorkflowString)
76 */
77 public JobURN submitWorkflow(WorkflowString workflowXML) throws JesFault{
78 logger.debug("in submit workflow");
79 try {
80 Workflow req = Workflow.unmarshalWorkflow(new StringReader(workflowXML.getValue()));
81 return this.submitJob(req);
82 } catch (CastorException e) {
83 throw createFault("Could not submit job",e);
84 }
85 }
86
87 /***
88 * Submit a job to the execution service
89 * @param req abstract request object
90 * @return unique identifier for the new job.
91 * @throws JesFault
92 */
93 public JobURN submitJob( Workflow req ) throws JesFault {
94 logger.debug("Submit Job");
95 Workflow job= null;
96 try {
97 job = factory.initializeJob( req) ;
98 nudger.scheduleNewJob(Castor2Axis.convert(job.getJobExecutionRecord().getJobId()));
99 JobURN result = new JobURN(job.getJobExecutionRecord().getJobId().getContent());
100 logger.debug("Submit Job: new URN = " + result.toString());
101 return result;
102 }
103 catch(Exception jex ) {
104 logger.error(jex);
105 if (job != null) {
106 try {
107 factory.deleteJob(job);
108 } catch (JobException e) {
109 logger.warn("failed to delete corrupted job - " + job.getJobExecutionRecord().getJobId() );
110 }
111 }
112 throw createFault("Failed to submit job",jex);
113 }
114
115 }
116
117
118 /***
119 * @see org.astrogrid.jes.delegate.v1.jobcontroller.JobController#cancelJob(org.astrogrid.jes.types.v1.JobURN)
120 */
121 public void cancelJob(JobURN arg0) {
122 logger.debug("Cancel Job:" + arg0.toString());
123 try {
124 nudger.abortJob(arg0);
125 } catch (Exception jex) {
126 logger.warn("cancelJob:",jex);
127 }
128 }
129
130 /***
131 * @see org.astrogrid.jes.delegate.v1.jobcontroller.JobController#deleteJob(org.astrogrid.jes.types.v1.JobURN)
132 */
133 public void deleteJob(JobURN arg0){
134 logger.debug("Delete Job: " + arg0.toString());
135 try {
136 nudger.deleteJob(arg0);
137 } catch (Exception jex) {
138 logger.warn("deleteJob:",jex);
139 }
140 }
141
142
143 /***
144 * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
145 */
146 public String getName() {
147 return "Standard Job Controller";
148 }
149
150 /***
151 * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
152 */
153 public String getDescription() {
154 return "Bog standard implementation";
155 }
156
157 /***
158 * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
159 */
160 public Test getInstallationTest() {
161 return null;
162 }
163
164
165 /***
166 * @see org.astrogrid.jes.delegate.v1.jobcontroller.JobController#readJobList(org.astrogrid.community.beans.v1.axis._Account)
167 */
168 public WorkflowSummaryType[] readJobList(_Account arg0) throws JesFault {
169 try {
170 logger.debug("in read job list");
171 Iterator i = factory.findUserJobs(Axis2Castor.convert(arg0));
172 List itemList = new ArrayList();
173 while (i.hasNext()) {
174 try {
175 Workflow w = (Workflow)i.next();
176 if (w == null) {
177 logger.warn("Found a null workflow");
178 continue;
179 }
180 WorkflowSummaryType item = new WorkflowSummaryType();
181 item.setWorkflowName(w.getName());
182 item.setDescription(w.getDescription());
183 JobExecutionRecord jobExecutionRecord = w.getJobExecutionRecord();
184 item.setJobId(Castor2Axis.convert(jobExecutionRecord.getJobId()));
185
186
187 item.setExtension(new _extension[0]);
188 if (jobExecutionRecord.getFinishTime() != null) {
189 Calendar cal = Calendar.getInstance();
190 cal.setTime(jobExecutionRecord.getFinishTime());
191 item.setFinishTime(cal);
192 }
193 if (jobExecutionRecord.getStartTime() != null) {
194 Calendar cal = Calendar.getInstance();
195 cal.setTime(jobExecutionRecord.getStartTime());
196 item.setStartTime(cal);
197 }
198
199 item.setMessage(Castor2Axis.convert(jobExecutionRecord.getMessage()));
200 item.setStatus(Castor2Axis.convert(jobExecutionRecord.getStatus()));
201 itemList.add(item);
202 } catch (RuntimeException e) {
203 logger.warn("failed to read workflow",e);
204 }
205 }
206
207 return (WorkflowSummaryType[])itemList.toArray(new WorkflowSummaryType[]{});
208 } catch (JesException e) {
209 throw createFault("Failed to read workflow list",e);
210 }
211 }
212
213 private JesFault createFault(String message,Exception e) {
214 logger.info("failed with exception",e);
215 JesFault jf = new JesFault(message);
216 jf.setStackTrace(e.getStackTrace());
217 jf.setFaultReason(e.getMessage());
218 jf.setFaultCodeAsString(e.getClass().getName());
219 return jf;
220 }
221
222 /***
223 * @see org.astrogrid.jes.delegate.v1.jobcontroller.JobController#readJob(org.astrogrid.jes.types.v1.JobURN)
224 @nb converted to use buffer
225 */
226 public WorkflowString readJob(JobURN arg0) throws JesFault {
227 try {
228 logger.debug("in readJob()");
229 JobFactory fac = factory;
230 Workflow w = fac.findJob(Axis2Castor.convert(arg0));
231 if ( w == null) {
232 logger.error("Factory returned null workflow");
233 throw new JesFault("factory returned null workflow");
234 }
235 buff.writeMode();
236 w.marshal(buff.getWriter());
237 buff.readMode();
238 return new WorkflowString(buff.getContents());
239 } catch (Exception e) {
240 throw createFault("Failed to read job",e);
241 }
242 }
243
244
245
246
247 }