1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.impl;
12
13 import org.astrogrid.component.descriptor.ComponentDescriptor;
14 import org.astrogrid.jes.beans.v1.axis.executionrecord.JobURN;
15 import org.astrogrid.jes.jobscheduler.JobScheduler;
16 import org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType;
17 import org.astrogrid.jes.types.v1.cea.axis.MessageType;
18 import org.astrogrid.jes.types.v1.cea.axis.ResultListType;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.picocontainer.Startable;
23
24 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
25 import junit.framework.Test;
26
27 /*** A scheduler implementation that decorates another scheduler with a task queue.
28 * <p>
29 * the decorated scheduler executes in a separate, single thread. Any method calls are encapsulated as task objects, and added to the task queue.
30 * <p>
31 * This ensures that the decorated scheduler executes in a single-threaded manner - as all requests to it are serialized by the task queue. Because of this,
32 * there is no need for the underlying implementation to worry about synchronization, concurrency, etc. In particular, underlying Job stores need provide no locking
33 * ability - as there will never by multiple concurrent updates to a job execution record.
34 * <p>
35 * Although the single-threaded nature may appear to be a bottle neck, we can expect this design to scale quite well - the scheduler is bound by IO
36 * due to communication with external web services, so serial processing of tasks is not a problem - each task is quick to process and short-lived.
37 * There could be further gains due to removal of overhead in calling synchronized methods and aquiring locks in the job store.
38 * @author Noel Winstanley nw@jb.man.ac.uk 18-Feb-2004
39 *
40 */
41 public class SchedulerTaskQueueDecorator implements JobScheduler , ComponentDescriptor, Startable{
42
43 /***
44 * Construct a new MemoryQueueSchedulerNotifier, that will use a Queued executor to service each of the tasks in turn.
45 * @param scheduler previously-constructed scheduler to pass notifications to.
46 */
47 public SchedulerTaskQueueDecorator(JobScheduler scheduler) {
48 this.executor = new QueuedExecutor();
49 this.factory = new TaskFactory(scheduler);
50 }
51 /*** concurrency framework component that provides a task queue */
52 protected final QueuedExecutor executor;
53 /*** factory component that creates tasks to add to the queue*/
54 protected final TaskFactory factory;
55 /***adds a task to the queue that will call 'scheduleNewJob' with the current parameters on the wrapped job scheduler
56 * @see org.astrogrid.jes.comm.SchedulerNotifier#notify(org.astrogrid.jes.job.Job)
57 */
58 public void scheduleNewJob(JobURN urn) throws Exception {
59 executor.execute(factory.createSubmitJobTask(urn));
60 }
61 /***adds a task to te queue that will call 'resumeJob' with the current parameters on the wrapped job scheduler
62 * @see org.astrogrid.jes.comm.SchedulerNotifier#notify(org.astrogrid.jes.types.v1.JobInfo)
63 */
64 public void resumeJob(JobIdentifierType ji,MessageType i) throws Exception {
65 executor.execute(factory.createResumeTask(ji,i));
66 }
67
68 /***
69 * @see org.astrogrid.jes.jobscheduler.JobScheduler#abortJob(org.astrogrid.jes.types.v1.JobURN)
70 */
71 public void abortJob(JobURN jobURN) throws Exception {
72 executor.execute(factory.createAbortJobTask(jobURN));
73
74 }
75 /***
76 * @see org.astrogrid.jes.jobscheduler.JobScheduler#deleteJob(org.astrogrid.jes.types.v1.JobURN)
77 */
78 public void deleteJob(JobURN jobURN) throws Exception {
79 executor.execute(factory.createDeleteJobTask(jobURN));
80 }
81
82 /***
83 * @see org.astrogrid.jes.jobscheduler.JobScheduler#reportResults(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.ResultListType)
84 */
85 public void reportResults(JobIdentifierType id, ResultListType results) throws Exception {
86 executor.execute(factory.createReportResultsJobTask(id,results));
87 }
88
89 /*** general method to add another runnable to the queue. useful for testing - i.e. can insert a 'end of test' runnable
90 * after all other tasks.
91 * @author Noel Winstanley nw@jb.man.ac.uk 19-Feb-2004
92 *
93 */
94 public void addTask(Runnable r) throws Exception {
95 executor.execute(r);
96 }
97 /*** helper class to build runnable tasks to be passed to the executor. Ensuring that only this class has a reference to the
98 * wrapped job scheduler itself ensures that we don't accidentaly call it directly;
99 * @author Noel Winstanley nw@jb.man.ac.uk 18-Feb-2004
100 *
101 */
102 private static class TaskFactory {
103 public TaskFactory(JobScheduler js) {
104 this.js = js;
105 }
106 final JobScheduler js;
107
108 /***
109 * @param id
110 * @param results
111 * @return
112 */
113 public Runnable createReportResultsJobTask(final JobIdentifierType id, final ResultListType results) {
114 return new Runnable() {
115 public void run() {
116 try {
117 js.reportResults(id,results);
118 } catch (Exception e) {
119 logger.error("report results",e);
120 }
121 }
122 };
123 }
124
125
126 private static final Log logger = LogFactory.getLog("TaskQueue");
127 /*** create a task to schedule new job */
128 public Runnable createSubmitJobTask(final JobURN urn) {
129 return new Runnable() {
130 public void run() {
131 try {
132 js.scheduleNewJob(urn);
133 } catch (Exception e) {
134 logger.error("schedule new job",e);
135 }
136 }
137 };
138 }
139 /*** create a task to resume a new job */
140 public Runnable createResumeTask(final JobIdentifierType ji, final MessageType msg) {
141 return new Runnable() {
142 public void run() {
143 try {
144 js.resumeJob(ji,msg);
145 } catch (Exception e) {
146 logger.error("resume job",e);
147 }
148 }
149 };
150 }
151 public Runnable createAbortJobTask(final JobURN urn) {
152 return new Runnable() {
153 public void run() {
154 try {
155 js.abortJob(urn);
156 } catch (Exception e) {
157 logger.error("abort job",e);
158 }
159 }
160 };
161 }
162 public Runnable createDeleteJobTask(final JobURN urn) {
163 return new Runnable() {
164 public void run() {
165 try {
166 js.deleteJob(urn);
167 } catch (Exception e) {
168 logger.error("delete job",e);
169 }
170 }
171 };
172 }
173 }
174
175 /***
176 * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
177 */
178 public String getName() {
179 return "MemoryQueueSchedulerNotifier";
180 }
181 /***
182 * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
183 */
184 public String getDescription() {
185 return "Notifier that interacts with in-process scheduler,running in a separate thread." +
186 "Notifications passed onto task queue for this thread";
187 }
188 /***
189 * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
190 */
191 public Test getInstallationTest() {
192 return null;
193 }
194 /***
195 * @see org.picocontainer.Startable#start()
196 */
197 public void start() {
198 }
199 /***shuts down the scheduler thread.
200 * @see org.picocontainer.Startable#stop()
201 */
202 public void stop() {
203 executor.shutdownNow();
204 }
205
206
207
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272