1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.datacenter.service.v06;
12
13 import EDU.oswego.cs.dl.util.concurrent.Executor;
14 import java.io.IOException;
15 import java.io.StringWriter;
16 import javax.xml.parsers.ParserConfigurationException;
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.astrogrid.applications.AbstractApplication;
20 import org.astrogrid.applications.CeaException;
21 import org.astrogrid.applications.Status;
22 import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
23 import org.astrogrid.applications.description.ApplicationInterface;
24 import org.astrogrid.applications.parameter.protocol.ProtocolLibrary;
25 import org.astrogrid.community.Account;
26 import org.astrogrid.datacenter.queriers.Querier;
27 import org.astrogrid.datacenter.queriers.QuerierListener;
28 import org.astrogrid.datacenter.queriers.status.QuerierStatus;
29 import org.astrogrid.datacenter.query.AdqlQueryMaker;
30 import org.astrogrid.datacenter.query.Query;
31 import org.astrogrid.datacenter.query.QueryState;
32 import org.astrogrid.datacenter.query.SimpleQueryMaker;
33 import org.astrogrid.datacenter.service.DataServer;
34 import org.astrogrid.slinger.targets.TargetIndicator;
35 import org.astrogrid.slinger.targets.TargetMaker;
36 import org.astrogrid.slinger.targets.WriterTarget;
37 import org.astrogrid.workflow.beans.v1.Tool;
38 import org.xml.sax.SAXException;
39
40 /*** Represents a query running against the datacenter.
41 * <p />
42 * Datacenter already has a framework for starting asynchronous queries and then monitoring their progress.
43 * This class wraps the datacenter framework, mapping querier events into events in the CEA framework.
44 *
45 * There is one instance of this class for each query
46 *
47 * @author Noel Winstanley nw@jb.man.ac.uk 12-Jul-2004
48 *
49 */
50 public class DatacenterApplication extends AbstractApplication implements QuerierListener {
51
52 /***
53 * Commons Logger for this class
54 */
55 private static final Log logger = LogFactory.getLog(DatacenterApplication.class);
56
57 protected final Account acc;
58 /*** the datacenter system object - a static, which provides access to the datacenter query framework */
59 protected final DataServer serv;
60 /*** the executor for background tasks */
61 Executor exec;
62
63 /*** id allocated by datacenter to this querier */
64 protected String querierID;
65
66 public String getQuerierId() { return querierID; }
67
68 /*** Construct a new DatacetnerApplication
69 * @param arg0
70 * @param arg1
71 * @param arg2
72 * @param arg3
73 */
74 public DatacenterApplication(IDs ids, Tool tool, ApplicationInterface interf, ProtocolLibrary arg3,DataServer serv,Executor exec) {
75 super(ids, tool,interf, arg3);
76 this.serv = serv;
77 this.exec = exec;
78 this.acc = new Account(ids.getUser().getUserId(),ids.getUser().getCommunity(),ids.getUser().getToken());
79 logger.info("CEA DSA initialised, Job ID="+ids.getJobStepId());
80 }
81
82 /*** construct a query from the contents of the tool
83 * (mch) not entirely happy with this following changes to Query that include
84 * the result spec. I've set it to table here and hope that targets etc get
85 * set properly later (as it would have done before)
86 * @param t
87 * @return
88 */
89 protected final Query buildQuery(ApplicationInterface interf) throws IOException, CeaException {
90
91 if (interf.getName().equals(DatacenterApplicationDescription.CONE_IFACE)) {
92 return SimpleQueryMaker.makeConeQuery(
93 Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RA).process())
94 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.DEC).process())
95 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RADIUS).process())
96 );
97 }
98 else if (interf.getName().equals(DatacenterApplicationDescription.ADQL_IFACE)) {
99 String querySource = findInputParameter(DatacenterApplicationDescription.QUERY).getValue();
100 String queryString = (String)findInputParameterAdapter(DatacenterApplicationDescription.QUERY).process();
101 if ((queryString == null) || (queryString.trim().length() == 0)) {
102 throw new IOException("Read empty string at "+querySource);
103 }
104 logger.debug("Query will be " + queryString);
105 try {
106 return AdqlQueryMaker.makeQuery(queryString);
107 }
108 catch (SAXException e) { throw new IllegalArgumentException(e+", reading Adql at "+querySource); }
109 catch (ParserConfigurationException e) { throw new CeaException("Server error:",e); }
110 }
111 else
112 {
113
114 logger.fatal("Programming logic error - unknown interface" + interf.getName());
115 throw new IllegalArgumentException("Programming logic error - unknown interface" + interf.getName());
116 }
117
118 }
119
120
121 /***
122 * @see org.astrogrid.applications.Application#execute()
123 */
124 public boolean execute() throws CeaException {
125 createAdapters();
126 try {
127 ParameterValue resultTarget = findOutputParameter(DatacenterApplicationDescription.RESULT);
128 TargetIndicator ti = null;
129 if (resultTarget.getIndirect()==true) {
130
131 if ((resultTarget.getValue() == null) || (resultTarget.getValue().trim().length() == 0)) {
132 throw new CeaException("ResultTarget is indirect but value is empty");
133 }
134 ti = TargetMaker.makeIndicator(resultTarget.getValue());
135 } else {
136
137
138 ti = TargetMaker.makeIndicator(new StringWriter(), true);
139 }
140
141 String resultsFormat = (String)findInputParameterAdapter(DatacenterApplicationDescription.FORMAT).process();
142 Query query = buildQuery(getApplicationInterface());
143 query.getResultsDef().setTarget(ti);
144 query.getResultsDef().setFormat(resultsFormat);
145 Querier querier = Querier.makeQuerier(acc,query, this);
146 querier.addListener(this);
147
148 setStatus(Status.INITIALIZED);
149 querierID = serv.submitQuerier(querier);
150 logger.info("assigned QuerierID " + querierID);
151 return true;
152 }
153 catch (Throwable e) {
154 reportError(e+" executing "+this.getTool().getName(),e);
155 if (e instanceof CeaException) {
156 throw (CeaException) e;
157 }
158 else {
159 throw new CeaException(e+", executing application "+this.getTool().getName(),e);
160 }
161 }
162 }
163
164 /*** Implemented by calling abot on the querier object - so if the underlyng database back end supports abort, the cec does too
165 * @see org.astrogrid.applications.Application#attemptAbort()
166 */
167 public boolean attemptAbort() {
168 try {
169 QuerierStatus stat = serv.abortQuery(acc,querierID);
170 if (stat.getState().equals(QueryState.ABORTED)) {
171 return true;
172 } else {
173 return false;
174 }
175 } catch (IOException e) {
176 logger.warn("Attempted abort failed with exception",e);
177 return false;
178 }
179 }
180
181
182 /*** callback method for our associated querier
183 * <p />Through this method, this class receives notifications in changes of state of the running query.
184 * These state changes and messages are propagated onto the cea framework,
185 * and results are grabbed from the query at the appropriate time.
186 * @see org.astrogrid.datacenter.queriers.QuerierListener#queryStatusChanged(org.astrogrid.datacenter.queriers.Querier)
187 */
188 public void queryStatusChanged(final Querier querier) {
189 QuerierStatus qs = querier.getStatus();
190 QueryState state = qs.getState();
191 logger.debug("CEA seen DSA state="+state);
192
193 if (state.equals(QueryState.CONSTRUCTED) || state.equals(QueryState.QUEUED)) {
194 this.setStatus(Status.INITIALIZED);
195
196 } else if (state.equals(QueryState.STARTING) || state.equals(QueryState.RUNNING_QUERY)) {
197 this.setStatus(Status.RUNNING);
198
199 } else if (state.equals(QueryState.QUERY_COMPLETE)) {
200 this.reportMessage(qs.toString());
201
202 } else if (state.equals(QueryState.RUNNING_RESULTS)) {
203 this.setStatus(Status.WRITINGBACK);
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 } else if (state.equals(QueryState.ABORTED)) {
226 this.reportMessage(qs.toString());
227 this.setStatus(Status.ERROR);
228
229 } else if (state.equals(QueryState.ERROR)) {
230 this.reportError(qs.toString());
231
232 } else if (state.equals(QueryState.FINISHED)) {
233 ParameterValue result = findOutputParameter(DatacenterApplicationDescription.RESULT);
234 if (result.getIndirect() != true) {
235
236
237 WriterTarget target = (WriterTarget) querier.getQuery().getTarget();
238 StringWriter sw = (StringWriter) target.resolveWriter(acc);
239 result.setValue(sw.toString() );
240 }
241
242 this.setStatus(Status.COMPLETED);
243 this.reportMessage(qs.toString());
244
245 } else if (state.equals(QueryState.UNKNOWN)) {
246 this.setStatus(Status.UNKNOWN);
247 this.reportMessage(qs.toString());
248
249 } else {
250 logger.fatal("Programming error - unknown state encountered" + state.toString());
251
252 this.setStatus(Status.UNKNOWN);
253 this.reportMessage("Unkown state encountered " + state.toString());
254 }
255
256 }
257
258
259 /*** overridden, to return instances of datacenter parameter adapter.
260 * @see org.astrogrid.applications.AbstractApplication#instantiateAdapter(org.astrogrid.applications.beans.v1.parameters.ParameterValue, org.astrogrid.applications.description.ParameterDescription, org.astrogrid.applications.parameter.indirect.IndirectParameterValue)
261 *
262 protected ParameterAdapter instantiateAdapter(ParameterValue arg0,
263 ParameterDescription arg1, ExternalValue arg2) {
264 return new DatacenterParameterAdapter(arg0, arg1, arg2);
265 }
266 */
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340