1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.impl.groovy;
12
13 import org.astrogrid.applications.beans.v1.cea.castor.MessageType;
14 import org.astrogrid.applications.beans.v1.cea.castor.types.ExecutionPhase;
15 import org.astrogrid.applications.beans.v1.cea.castor.types.LogLevel;
16 import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
17 import org.astrogrid.jes.jobscheduler.Dispatcher;
18 import org.astrogrid.jes.jobscheduler.impl.AbstractJobSchedulerImpl;
19 import org.astrogrid.jes.util.JesUtil;
20 import org.astrogrid.jes.util.TemporaryBuffer;
21 import org.astrogrid.workflow.beans.v1.Script;
22 import org.astrogrid.workflow.beans.v1.Set;
23 import org.astrogrid.workflow.beans.v1.Step;
24 import org.astrogrid.workflow.beans.v1.Tool;
25 import org.astrogrid.workflow.beans.v1.Workflow;
26 import org.astrogrid.workflow.beans.v1.execution.JobExecutionRecord;
27 import org.astrogrid.workflow.beans.v1.execution.StepExecutionRecord;
28
29 import org.apache.commons.collections.CollectionUtils;
30
31 import java.io.PrintStream;
32 import java.io.UnsupportedEncodingException;
33 import java.lang.ref.SoftReference;
34 import java.util.ArrayList;
35 import java.util.Date;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39
40
41 /*** provides an interface for executing groovy scripts in rulebase to call back into jes server
42 * also provides some 'helper' methods that can be called from scripts to create usefule objects, etc. Maybe these should be split into a libraru class, and a JesServerINterface class.
43 * @modified - added temporary buffers (under soft reference) for capturing system err and out.
44 * @script-summary access the jes server
45 * @script-doc this object allows access to some of the internals of the jes server the workflow script is executing on
46 */
47 public class JesInterface extends WorkflowLogger {
48 private static final String ENCODING = "UTF-16";
49 /*** construct a new interpreter environment
50 * @param wf workflow object
51 * @param disp ToolDispatcher
52 */
53 public JesInterface(Workflow wf, Dispatcher disp,GroovySchedulerImpl sched){
54 super(wf);
55 this.disp = disp;
56 this.sched = sched;
57 }
58
59 protected final GroovySchedulerImpl sched;
60 protected final Dispatcher disp;
61 private SoftReference softErrBuff = new SoftReference(new TemporaryBuffer());
62 private SoftReference softOutBuff = new SoftReference(new TemporaryBuffer());
63
64 /*** access the version info for this jes server.
65 * <p>
66 * at present returns a list of bugzilla numbers this server implements
67 * @todo implement to return richer info
68 * @script-doc inspect version information for this server*/
69 public String getVersion() {
70 return "Iteration 07, jes-nww-714 workflow-nww-776";
71 }
72
73 /*** the workflow object the script belongs to
74 * @script-doc access the object model of the current workflow
75 * */
76 public Workflow getWorkflow() {
77 return wf;
78 }
79
80 /*** return list of steps in the workflow document
81 * @script-doc access the list of steps in the current workflow*/
82 public List getSteps() {
83 Iterator i = JesUtil.getJobSteps(wf);
84 List l = new ArrayList();
85 CollectionUtils.addAll(l,i);
86 return l;
87 }
88
89 /*** create a new initialized parameter object
90 * @script-doc create a new tool-parameter object, initialized, but unattached to any tool*/
91 public ParameterValue newParameter() {
92 ParameterValue pval = new ParameterValue();
93 pval.setIndirect(false);
94 pval.setEncoding("");
95 return pval;
96 }
97
98 /*** the tool step dispatcher
99 * @script-doc-omit don't think it's sensible to encourage use of this.*/
100 public Dispatcher getDispatcher() {
101 return disp;
102 }
103
104
105 /*** access object in workflow tree by id
106 *@script-doc access an element in the workflow document by it's unique id.
107 * */
108 public Object getId(String id) {
109 if (id.equals(getWorkflow().getId())) {
110 return getWorkflow();
111 }
112 Object result = getWorkflow().findXPathValue("//*[id = '" + id + "']");
113 if (result != null) {
114 return result;
115 } else {
116 int lastIndex = id.lastIndexOf('-');
117 if (lastIndex != -1) {
118
119 return getId(id.substring(0,lastIndex));
120 } else {
121 return null;
122 }
123 }
124 }
125 /***
126 * dispatach a tool step to a cea server for execution.
127 * @param id - the identifier of the step to execute
128 * @script-doc-omit very internal
129 * */
130 public boolean dispatchStep(String id, JesShell shell, ActivityStatusStore states, Map rules) {
131 Step step = (Step)getId(id);
132 StepExecutionRecord er = AbstractJobSchedulerImpl.newStepExecutionRecord();
133 step.addStepExecutionRecord(er);
134 try{
135 Tool tool = shell.evaluateTool(step.getTool(),id,states,rules);
136 getDispatcher().dispatchStep(getWorkflow(),tool,id);
137 er.setStatus(ExecutionPhase.RUNNING);
138 return true;
139 } catch (Throwable t) {
140 error("Failed to dispatch step",t,er);
141 er.setStatus(ExecutionPhase.ERROR);
142 debug("Failed to dispatch step",t,er);
143 return false;
144 }
145 }
146
147 /*** execute a set activity
148 * @script-doc-omit very internal*/
149 public boolean executeSet(String id,JesShell shell,ActivityStatusStore map,Map rules) {
150 Set set = (Set)getId(id);
151 try {
152 shell.executeSet(set,id,map,rules);
153 return true;
154 } catch (Throwable t) {
155 error("set " + set.getVar() + ":= " + set.getValue() + " failed",t);
156 return false;
157 }
158 }
159 /*** dispatch / execute a script activity
160 * @script-doc-omit you must be joking.. */
161 public boolean dispatchScript(String id,JesShell shell,ActivityStatusStore map,Map rules) {
162 Script script = (Script)getId(id);
163 StepExecutionRecord er = AbstractJobSchedulerImpl.newStepExecutionRecord();
164 script.addStepExecutionRecord(er);
165 er.setStatus(ExecutionPhase.RUNNING);
166
167 TemporaryBuffer errB = (TemporaryBuffer)softErrBuff.get();
168 if (errB == null) {
169 errB = new TemporaryBuffer();
170 softErrBuff = new SoftReference(errB);
171 }
172
173 TemporaryBuffer outB = (TemporaryBuffer)softOutBuff.get();
174 if (outB == null) {
175 outB = new TemporaryBuffer();
176 softOutBuff = new SoftReference(outB);
177 }
178
179 errB.writeMode();
180 outB.writeMode();
181 PrintStream errStream = null;
182 PrintStream outStream = null;
183 try {
184 errStream = new PrintStream(errB.getOutputStream(),false,ENCODING);
185 outStream = new PrintStream(outB.getOutputStream(),false,ENCODING);
186 } catch (UnsupportedEncodingException e) {
187 logger.fatal("JVM doesn't support UTF-16 - which is required by specs",e);
188 throw new RuntimeException("JVM doesn't support UTF-16 - which is required by specs",e);
189 }
190
191 try {
192 shell.executeScript(script.getBody(),id,map,rules,errStream,outStream);
193 er.setStatus(ExecutionPhase.COMPLETED);
194 return true;
195 } catch (Throwable t) {
196 error("Failed to execute script",t,er);
197 er.setStatus(ExecutionPhase.ERROR);
198 debug("Failed to executte script",t,er);
199 return false;
200 } finally {
201 errStream.close();
202 outStream.close();
203 errB.readMode();
204 outB.readMode();
205 MessageType message =buildMessage(errB.getContents(ENCODING));
206 message.setLevel(LogLevel.INFO);
207 message.setSource("stderr");
208 er.addMessage(message);
209
210 message = buildMessage(outB.getContents(ENCODING));
211 message.setLevel(LogLevel.INFO);
212 message.setSource("stdout");
213 er.addMessage(message);
214 er.setFinishTime(new Date());
215 }
216 }
217
218
219 /*** records most important events back into main workflow document.
220 * @script-doc-omit alter status of the entire workflow document. Can be used to halt / abort execution. */
221 public void setWorkflowStatus(Status status) {
222 ExecutionPhase phase = Status.toPhase(status);
223 JobExecutionRecord er = wf.getJobExecutionRecord();
224 er.setStatus(phase);
225 if (phase.equals(ExecutionPhase.COMPLETED) || phase.equals(ExecutionPhase.ERROR)){
226 wf.getJobExecutionRecord().setFinishTime(new Date());
227 logger.info("Job " + er.getJobId().getContent() + " finished with status " + er.getStatus());
228 sched.notifyJobFinished(wf);
229 }
230 }
231
232
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320