1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.jobscheduler.dispatcher.inprocess;
12
13 import org.astrogrid.applications.Application;
14 import org.astrogrid.applications.CeaException;
15 import org.astrogrid.applications.Status;
16 import org.astrogrid.applications.beans.v1.cea.castor.MessageType;
17 import org.astrogrid.applications.beans.v1.cea.castor.types.LogLevel;
18 import org.astrogrid.applications.manager.DefaultApplicationEnvironmentRetriever;
19 import org.astrogrid.applications.manager.DefaultQueryService;
20 import org.astrogrid.applications.manager.observer.AbstractProgressListener;
21 import org.astrogrid.applications.manager.observer.AbstractResultsListener;
22 import org.astrogrid.applications.manager.persist.ExecutionHistory;
23 import org.astrogrid.common.bean.Castor2Axis;
24 import org.astrogrid.jes.delegate.v1.jobmonitor.JobMonitor;
25 import org.astrogrid.jes.service.v1.cearesults.ResultsListener;
26 import org.astrogrid.jes.types.v1.cea.axis.JobIdentifierType;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 import java.net.URI;
32 import java.net.URISyntaxException;
33 import java.rmi.RemoteException;
34
35 /*** Query service that allows registration of inprocess listeners to cea applications.
36 * - if the urn 'jes:inprocess' is passed to one of the register*Listener methods,
37 * then a listener will be registered with an application that calls directly back to the jobmonitor component.
38 * @author Noel Winstanley nw@jb.man.ac.uk 07-Feb-2005
39 *
40 */
41 public class InProcessQueryService extends DefaultQueryService {
42 /***
43 * Commons Logger for this class
44 */
45 private static final Log logger = LogFactory.getLog(InProcessQueryService.class);
46
47 /*** Construct a new InProcessQueryService
48 * @param arg0
49 * @param jm job monitor component to call back to.
50 * @param jrl results listner component to call back to.
51 */
52 public InProcessQueryService(ExecutionHistory arg0, JobMonitor jm, ResultsListener jrl) {
53
54
55 super(arg0, new DefaultApplicationEnvironmentRetriever(arg0));
56 pl = new InprocessProgressListener(jm);
57 rl = new InprocessResultsListener(jrl);
58 }
59
60 protected final InprocessProgressListener pl ;
61 protected final InprocessResultsListener rl ;
62
63 public String getDescription() {
64 return "Query service that supports a short-cut inprocess listener";
65 }
66 public String getName() {
67 return "InProcessQueryService";
68 }
69
70 /*** recognize a special uri protocol - 'jes:inprocess', and in this case, register a direct progress listener */
71 public boolean registerProgressListener(String arg0, URI arg1)
72 throws CeaException {
73 if (arg1.equals(INPROCESS_URI)) {
74 if (!executionHistory.isApplicationInCurrentSet(arg0)) {
75 return false;
76 }
77 Application app = executionHistory.getApplicationFromCurrentSet(arg0);
78 app.addObserver(pl);
79 return true;
80 } else {
81 return super.registerProgressListener(arg0, arg1);
82 }
83 }
84 /*** recognize a special uri protocol - 'jes:inprocess', and in this case, register
85 * a progress listener that calls directly back to the jes results listener
86 * @see org.astrogrid.applications.manager.QueryService#registerResultsListener(java.lang.String, java.net.URI)
87 */
88 public boolean registerResultsListener(String arg0, URI arg1)
89 throws CeaException {
90 if (arg1.equals(INPROCESS_URI)) {
91 if (!executionHistory.isApplicationInCurrentSet(arg0)) {
92 return false;
93 }
94 Application app = executionHistory.getApplicationFromCurrentSet(arg0);
95 app.addObserver(rl);
96 return true;
97 } else {
98 return super.registerResultsListener(arg0, arg1);
99 }
100 }
101
102
103 /*** constant value for 'jes:inprocess' */
104 public static URI INPROCESS_URI ;
105 static {
106 try {
107 INPROCESS_URI = new URI("jes","inprocess",null);
108 } catch (URISyntaxException e) {
109 throw new RuntimeException(e);
110 }
111 }
112
113 /***
114 * Shortcut listener that notifies job monitor component directly
115 * @author Noel Winstanley nw@jb.man.ac.uk 07-Feb-2005
116 *@todo refactor these back into cea server? code is the same, just the construction
117 *of the jobMonitor is different..
118 */
119 protected static class InprocessProgressListener extends AbstractProgressListener {
120 /***
121 * Commons Logger for this class
122 */
123 private static final Log logger = LogFactory.getLog(InprocessProgressListener.class);
124
125 public InprocessProgressListener(JobMonitor jm) {
126 this.jm = jm;
127 }
128 protected final JobMonitor jm;
129
130 /***
131 * @see org.astrogrid.applications.manager.observer.AbstractProgressListener#reportMessage(org.astrogrid.applications.Application, org.astrogrid.applications.beans.v1.cea.castor.MessageType)
132 */
133 protected void reportMessage(Application arg0, MessageType arg1) {
134 try {
135 jm.monitorJob(new JobIdentifierType(arg0.getJobStepID()),Castor2Axis.convert(arg1));
136 } catch (RemoteException e) {
137 logger.warn("JesDelegateException",e);
138 }
139 }
140
141 /***
142 * @see org.astrogrid.applications.manager.observer.AbstractProgressListener#reportStatusChange(org.astrogrid.applications.Application, org.astrogrid.applications.Status)
143 */
144 protected void reportStatusChange(Application app, Status status) {
145 MessageType message = app.createTemplateMessage();
146 message.setPhase(status.toExecutionPhase());
147 message.setLevel(LogLevel.INFO);
148 message.setContent("Application enters new phase");
149 try {
150 jm.monitorJob(new JobIdentifierType( app.getJobStepID()),Castor2Axis.convert(message));
151 } catch (RemoteException e) {
152 logger.warn("JesDelegateException",e);
153 }
154
155 }
156
157 }
158
159 /***
160 * Shortcut listener that notifies results listener component directly
161 * @author Noel Winstanley nw@jb.man.ac.uk 07-Feb-2005
162 *
163 */
164 protected static class InprocessResultsListener extends AbstractResultsListener {
165 /***
166 * Commons Logger for this class
167 */
168 private static final Log logger = LogFactory.getLog(InprocessResultsListener.class);
169
170 public InprocessResultsListener(ResultsListener jrl) {
171 this.jrl = jrl;
172 }
173 protected final ResultsListener jrl;
174 /***
175 * @see org.astrogrid.applications.manager.observer.AbstractResultsListener#notifyResultsAvailable(org.astrogrid.applications.Application)
176 */
177 protected void notifyResultsAvailable(Application app) {
178 try {
179 jrl.putResults(new JobIdentifierType(app.getJobStepID()),Castor2Axis.convert( app.getResult()));
180 } catch (RemoteException e) {
181 logger.warn("RemoteException",e);
182 }
183
184 }
185 }
186
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211