View Javadoc

1   /*$Id: SchedulerTaskQueueDecorator.java,v 1.8 2004/12/03 14:47:41 jdt Exp $
2    * Created on 18-Feb-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid 
7    * Software License version 1.2, a copy of which has been included 
8    * with this distribution in the LICENSE.txt file.  
9    *
10  **/
11  package org.astrogrid.jes.jobscheduler.impl;
12  
13  import org.astrogrid.component.descriptor.ComponentDescriptor;
14  import org.astrogrid.jes.beans.v1.axis.executionrecord.JobURN;
15  import org.astrogrid.jes.jobscheduler.JobScheduler;
16  import org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType;
17  import org.astrogrid.jes.types.v1.cea.axis.MessageType;
18  import org.astrogrid.jes.types.v1.cea.axis.ResultListType;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.picocontainer.Startable;
23  
24  import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
25  import junit.framework.Test;
26  
27  /*** A scheduler implementation that decorates another scheduler with a task queue.
28   * <p>
29   * the decorated scheduler executes in a separate, single thread. Any method calls are encapsulated as task objects, and added to the task queue.
30   * <p>
31   * This ensures that the decorated scheduler executes in a single-threaded manner - as all requests to it are serialized by the task queue. Because of this,
32   * there is no need for the underlying implementation to worry about synchronization, concurrency, etc. In particular, underlying Job stores need provide no locking 
33   * ability - as there will never by multiple concurrent updates to a job execution record.
34   * <p>
35   * Although the single-threaded nature may appear to be a bottle neck, we can expect this design to scale quite well - the scheduler is bound by IO 
36   * due to   communication with external web services, so serial processing of tasks is not a problem - each task is quick to process and short-lived.
37   * There could be further gains due to removal of overhead in calling synchronized methods and aquiring locks in the job store. 
38   * @author Noel Winstanley nw@jb.man.ac.uk 18-Feb-2004
39   *
40   */
41  public class SchedulerTaskQueueDecorator implements JobScheduler , ComponentDescriptor, Startable{
42  
43      /*** 
44       *  Construct a new MemoryQueueSchedulerNotifier, that will use a Queued executor to service each of the tasks in turn.
45       * @param scheduler previously-constructed scheduler to pass notifications to.
46       */
47      public SchedulerTaskQueueDecorator(JobScheduler scheduler) {
48          this.executor = new QueuedExecutor();
49          this.factory = new TaskFactory(scheduler);
50      }
51      /*** concurrency framework component that provides a task queue */
52      protected final QueuedExecutor executor;
53      /*** factory component that creates tasks to add to the queue*/
54      protected final TaskFactory factory;
55      /***adds a task to the queue that will call 'scheduleNewJob' with the current parameters on the wrapped job scheduler
56       * @see org.astrogrid.jes.comm.SchedulerNotifier#notify(org.astrogrid.jes.job.Job)
57       */
58      public void scheduleNewJob(JobURN urn) throws Exception {        
59          executor.execute(factory.createSubmitJobTask(urn));
60      }
61      /***adds a task to te queue that will call 'resumeJob' with the current parameters on the wrapped job scheduler
62       * @see org.astrogrid.jes.comm.SchedulerNotifier#notify(org.astrogrid.jes.types.v1.JobInfo)
63       */
64      public void resumeJob(JobIdentifierType ji,MessageType i) throws Exception {
65          executor.execute(factory.createResumeTask(ji,i));
66      }    
67      
68      /***
69       * @see org.astrogrid.jes.jobscheduler.JobScheduler#abortJob(org.astrogrid.jes.types.v1.JobURN)
70       */
71      public void abortJob(JobURN jobURN) throws Exception {
72          executor.execute(factory.createAbortJobTask(jobURN));
73         
74      }
75      /***
76       * @see org.astrogrid.jes.jobscheduler.JobScheduler#deleteJob(org.astrogrid.jes.types.v1.JobURN)
77       */
78      public void deleteJob(JobURN jobURN) throws Exception {
79          executor.execute(factory.createDeleteJobTask(jobURN));
80      }    
81  
82      /***
83       * @see org.astrogrid.jes.jobscheduler.JobScheduler#reportResults(org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType, org.astrogrid.jes.types.v1.cea.axis.ResultListType)
84       */
85      public void reportResults(JobIdentifierType id, ResultListType results) throws Exception {
86          executor.execute(factory.createReportResultsJobTask(id,results));
87      }
88      
89      /*** general method to add another runnable to the queue. useful for testing - i.e. can insert a 'end of test' runnable 
90       * after all other tasks.
91       * @author Noel Winstanley nw@jb.man.ac.uk 19-Feb-2004
92       *
93       */
94      public void addTask(Runnable r)  throws Exception {
95          executor.execute(r);
96      }
97      /*** helper class to build runnable tasks to be passed to the executor. Ensuring that only this class has a reference to the
98       * wrapped job scheduler itself ensures that we don't accidentaly call it directly;
99       * @author Noel Winstanley nw@jb.man.ac.uk 18-Feb-2004
100      *
101      */
102     private static class TaskFactory {
103         public TaskFactory(JobScheduler js) {
104             this.js = js;
105         }
106         final JobScheduler js;
107                 
108         /***
109          * @param id
110          * @param results
111          * @return
112          */
113         public Runnable createReportResultsJobTask(final JobIdentifierType id, final ResultListType results) {
114             return new Runnable() {
115                 public void run() {
116                     try {
117                         js.reportResults(id,results);
118                     } catch (Exception e) {
119                         logger.error("report results",e);
120                     }
121                 }
122             };
123         }
124 
125         
126         private static final Log logger = LogFactory.getLog("TaskQueue");
127         /*** create a task to schedule new job */
128         public Runnable createSubmitJobTask(final JobURN urn) {
129             return new Runnable() {
130                 public void run() {
131                     try {
132                     js.scheduleNewJob(urn);
133                     } catch (Exception e) {
134                         logger.error("schedule new job",e);
135                     }
136                 }                
137             };
138         } 
139         /*** create a task to resume a new job */
140         public Runnable createResumeTask(final JobIdentifierType ji, final MessageType msg) {
141             return new Runnable() {
142                 public void run() {
143                     try {
144                     js.resumeJob(ji,msg);
145                     } catch (Exception e) {
146                         logger.error("resume job",e);
147                     }
148                 }
149             };
150         }
151         public Runnable createAbortJobTask(final JobURN urn) {
152             return new Runnable() {
153                 public void run() {
154                     try {
155                         js.abortJob(urn);
156                     } catch (Exception e) {
157                         logger.error("abort job",e);
158                     }
159                 }
160             };
161         }
162         public Runnable createDeleteJobTask(final JobURN urn) {
163             return new Runnable() {
164                 public void run() {
165                     try {
166                         js.deleteJob(urn);
167                     } catch (Exception e) {
168                         logger.error("delete job",e);
169                     }
170                 }
171             };
172         }
173     }
174 
175     /***
176      * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
177      */
178     public String getName() {
179         return "MemoryQueueSchedulerNotifier";
180     }
181     /***
182      * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
183      */
184     public String getDescription() {
185         return "Notifier that interacts with in-process scheduler,running in a separate thread." +
186             "Notifications passed onto task queue for this thread";  
187     }
188     /***
189      * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
190      */
191     public Test getInstallationTest() {
192         return null;
193     }
194     /***
195      * @see org.picocontainer.Startable#start()
196      */
197     public void start() {
198     }
199     /***shuts down the scheduler thread.
200      * @see org.picocontainer.Startable#stop()
201      */
202     public void stop() {
203         executor.shutdownNow();
204     }
205 
206 
207 
208     }
209 
210 
211 
212 /* 
213 $Log: SchedulerTaskQueueDecorator.java,v $
214 Revision 1.8  2004/12/03 14:47:41  jdt
215 Merges from workflow-nww-776
216 
217 Revision 1.7.14.1  2004/12/01 21:48:20  nw
218 adjusted to work with new summary object,
219 and changed package of JobURN
220 
221 Revision 1.7  2004/11/05 16:52:42  jdt
222 Merges from branch nww-itn07-scratchspace
223 
224 Revision 1.6.18.1  2004/11/05 15:43:49  nw
225 tidied imports
226 
227 Revision 1.6  2004/09/16 21:43:10  nw
228 enabled worker thread to be shut down.
229 
230 Revision 1.5  2004/07/01 21:15:00  nw
231 added results-listener interface to jes
232 
233 Revision 1.4  2004/04/08 14:43:26  nw
234 added delete and abort job functionality
235 
236 Revision 1.3  2004/03/15 23:45:07  nw
237 improved javadoc
238 
239 Revision 1.2  2004/03/15 01:30:45  nw
240 factored component descriptor out into separate package
241 
242 Revision 1.1  2004/03/15 00:30:54  nw
243 factored implemetation of scheduler out of parent package.
244 
245 Revision 1.6  2004/03/15 00:06:57  nw
246 removed SchedulerNotifier interface - replaced references to it by references to JobScheduler interface - identical
247 
248 Revision 1.5  2004/03/07 21:04:39  nw
249 merged in nww-itn05-pico - adds picocontainer
250 
251 Revision 1.4.4.1  2004/03/07 20:38:52  nw
252 added componet descriptor interface impl,
253 refactored any primitive types passed into constructor
254 
255 Revision 1.4  2004/03/05 16:16:23  nw
256 worked now object model through jes.
257 implemented basic scheduling policy
258 removed internal facade
259 
260 Revision 1.3  2004/03/03 01:13:42  nw
261 updated jes to work with regenerated workflow object model
262 
263 Revision 1.2  2004/02/27 00:46:03  nw
264 merged branch nww-itn05-bz#91
265 
266 Revision 1.1.2.1  2004/02/19 13:33:17  nw
267 removed rough scheduler notifier, replaced with one
268 using SOAP delegate.
269 
270 added in-memory concurrent notifier
271  
272 */