1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.dataservice.service.cea;
12
13 import EDU.oswego.cs.dl.util.concurrent.Executor;
14 import java.io.IOException;
15 import java.io.StringWriter;
16 import java.net.URISyntaxException;
17 import java.net.URL;
18 import java.security.Principal;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.astrogrid.applications.AbstractApplication;
22 import org.astrogrid.applications.CeaException;
23 import org.astrogrid.applications.Status;
24 import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
25 import org.astrogrid.applications.description.ApplicationInterface;
26 import org.astrogrid.applications.parameter.protocol.ProtocolLibrary;
27 import org.astrogrid.dataservice.queriers.Querier;
28 import org.astrogrid.dataservice.queriers.QuerierListener;
29 import org.astrogrid.dataservice.queriers.status.QuerierStatus;
30 import org.astrogrid.dataservice.service.AxisDataServer;
31 import org.astrogrid.dataservice.service.DataServer;
32 import org.astrogrid.io.account.LoginAccount;
33 import org.astrogrid.io.mime.MimeNames;
34 import org.astrogrid.query.Query;
35 import org.astrogrid.query.returns.ReturnTable;
36 import org.astrogrid.query.QueryException;
37 import org.astrogrid.query.QueryState;
38 import org.astrogrid.slinger.sourcetargets.HomespaceSourceTarget;
39 import org.astrogrid.slinger.sourcetargets.URISourceTargetMaker;
40 import org.astrogrid.slinger.targets.TargetIdentifier;
41 import org.astrogrid.slinger.targets.WriterTarget;
42 import org.astrogrid.workflow.beans.v1.Tool;
43 import org.xml.sax.SAXException;
44 import org.astrogrid.slinger.homespace.HomespaceName;
45 import org.astrogrid.slinger.ivo.IVORN;
46
47 /*** Represents a query running against the datacenter.
48 * <p />
49 * Datacenter already has a framework for starting asynchronous queries and then monitoring their progress.
50 * This class wraps the datacenter framework, mapping querier events into events in the CEA framework.
51 *
52 * There is one instance of this class for each query
53 *
54 * @author Noel Winstanley nw@jb.man.ac.uk 12-Jul-2004
55 * @author K Andrews
56 *
57 */
58 public class DatacenterApplication extends AbstractApplication implements QuerierListener {
59
60 /***
61 * Commons Logger for this class
62 */
63 private static final Log logger = LogFactory.getLog(DatacenterApplication.class);
64
65 protected final Principal acc;
66 /*** the datacenter system object - a static, which provides access to the datacenter query framework */
67 protected final DataServer serv;
68 /*** the executor for background tasks */
69 Executor exec;
70
71 /*** id allocated by datacenter to this querier */
72 protected String querierID;
73
74 public String getQuerierId() { return querierID; }
75
76 /*** Construct a new DatacetnerApplication
77 * @param arg0
78 * @param arg1
79 * @param arg2
80 * @param arg3
81 */
82 public DatacenterApplication(IDs ids, Tool tool, ApplicationInterface interf, ProtocolLibrary arg3,DataServer serv,Executor exec) {
83 super(ids, tool,interf, arg3);
84 this.serv = serv;
85 this.exec = exec;
86 this.acc = new LoginAccount(ids.getUser().getUserId(),ids.getUser().getCommunity());
87 logger.info("CEA DSA initialised, Job ID="+ids.getJobStepId());
88 }
89
90 /*** construct a query from the contents of the tool
91 * (mch) not entirely happy with this following changes to Query that include
92 * the result spec. I've set it to table here and hope that targets etc get
93 * set properly later (as it would have done before)
94 * FURTHER COMMENT BY KEA: Explicitly setting the ReturnSpec in the
95 * CONE_IFACE query now, to a ReturnTable.
96 * @param t
97 * @return
98 */
99 protected final Query buildQuery(ApplicationInterface interf) throws IOException, CeaException, QueryException {
100
101 if (interf.getName().equals(DatacenterApplicationDescription.CONE_IFACE)) {
102
103
104
105
106
107
108
109 return new Query(
110 Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RA).process()),
111 Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.DEC).process()),
112 Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RADIUS).process()),
113 new ReturnTable(new WriterTarget(new StringWriter()))
114 );
115 }
116 else if (interf.getName().equals(DatacenterApplicationDescription.ADQL_IFACE)) {
117 String querySource = findInputParameter(DatacenterApplicationDescription.QUERY).getValue();
118 String queryString = (String)findInputParameterAdapter(DatacenterApplicationDescription.QUERY).process();
119 if ((queryString == null) || (queryString.trim().length() == 0)) {
120 throw new IOException("Read empty string at "+querySource);
121 }
122 logger.debug("Query will be " + queryString);
123 return new Query(queryString);
124 }
125 else
126 {
127
128 logger.fatal("Programming logic error - unknown interface" + interf.getName());
129 throw new IllegalArgumentException("Programming logic error - unknown interface" + interf.getName());
130 }
131
132 }
133
134
135 /***
136 * @see org.astrogrid.applications.Application#execute()
137 */
138 public boolean execute() throws CeaException {
139 createAdapters();
140 try {
141 ParameterValue resultTarget = findOutputParameter(DatacenterApplicationDescription.RESULT);
142 TargetIdentifier ti = null;
143 if (resultTarget.getIndirect()==true) {
144
145 if ((resultTarget.getValue() == null) || (resultTarget.getValue().trim().length() == 0)) {
146 throw new CeaException("ResultTarget is indirect but value is empty");
147 }
148
149
150 String targetUri = transformTarget(resultTarget.getValue());
151
152 ti = URISourceTargetMaker.makeSourceTarget(targetUri);
153 } else {
154
155
156
157 ti = new WriterTarget(new StringWriter(), true);
158 }
159
160 String resultsFormat = MimeNames.getMimeType( findInputParameterAdapter(DatacenterApplicationDescription.FORMAT).process().toString());
161
162 Query query = buildQuery(getApplicationInterface());
163 query.getResultsDef().setTarget(ti);
164 query.getResultsDef().setFormat(resultsFormat);
165 Querier querier = Querier.makeQuerier(acc,query, "CEA from "+AxisDataServer.getSource()+" [Job "+ids.getJobStepId()+"]");
166 querier.addListener(this);
167
168 setStatus(Status.INITIALIZED);
169 querierID = serv.submitQuerier(querier);
170 logger.info("assigned QuerierID " + querierID);
171 return true;
172 }
173 catch (Throwable e) {
174 reportError(e+" executing "+this.getTool().getName(),e);
175 if (e instanceof CeaException) {
176 throw (CeaException) e;
177 }
178 else {
179 throw new CeaException(e+", executing application "+this.getTool().getName(),e);
180 }
181 }
182 }
183
184 /***
185 * If the incoming URI describing the target location is an IVORN, it might be
186 * an account or it might be a real IVORN. Examines it to see if it is of the
187 * form of an account (ie ivo://community/individual) and if so, checks to see
188 * if the Registry can resolve it. If it is of the right form, and the registry
189 * cannot resolve it, turns it into a homespacename
190 */
191 public String transformTarget(String targetUri) throws IllegalArgumentException, URISyntaxException {
192
193 int hashIdx = targetUri.indexOf("#");
194 String key = targetUri.substring(0,hashIdx).substring(6);
195 int slashIdx = key.indexOf("/");
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 HomespaceName homespace = HomespaceName.fromIvorn(targetUri);
219 logger.info("Converting incoming '"+targetUri+"' to '"+homespace+"'");
220 return homespace.toURI();
221
222
223 }
224
225 /*** Implemented by calling abot on the querier object - so if the underlyng database back end supports abort, the cec does too
226 * @see org.astrogrid.applications.Application#attemptAbort()
227 */
228 public boolean attemptAbort() {
229 try {
230 QuerierStatus stat = serv.abortQuery(acc,querierID);
231 if (stat.getState().equals(QueryState.ABORTED)) {
232 return true;
233 } else {
234 return false;
235 }
236 } catch (IOException e) {
237 logger.warn("Attempted abort failed with exception",e);
238 return false;
239 }
240 }
241
242
243 /*** callback method for our associated querier
244 * <p />Through this method, this class receives notifications in changes of state of the running query.
245 * These state changes and messages are propagated onto the cea framework,
246 * and results are grabbed from the query at the appropriate time.
247 * @see org.astrogrid.datacenter.queriers.QuerierListener#queryStatusChanged(org.astrogrid.datacenter.queriers.Querier)
248 */
249 public void queryStatusChanged(final Querier querier) {
250 QuerierStatus qs = querier.getStatus();
251 QueryState state = qs.getState();
252 logger.debug("CEA seen DSA state="+state);
253
254 if (state.equals(QueryState.CONSTRUCTED) || state.equals(QueryState.QUEUED)) {
255 this.setStatus(Status.INITIALIZED);
256
257 } else if (state.equals(QueryState.STARTING) || state.equals(QueryState.RUNNING_QUERY)) {
258 this.setStatus(Status.RUNNING);
259
260 } else if (state.equals(QueryState.QUERY_COMPLETE)) {
261 this.reportMessage(qs.toString());
262
263 } else if (state.equals(QueryState.RUNNING_RESULTS)) {
264 this.setStatus(Status.WRITINGBACK);
265 } else if (state.equals(QueryState.ABORTED)) {
266 this.reportMessage(qs.toString());
267 this.setStatus(Status.ERROR);
268
269 } else if (state.equals(QueryState.ERROR)) {
270 this.reportError(qs.toString());
271
272 } else if (state.equals(QueryState.FINISHED)) {
273 ParameterValue result = findOutputParameter(DatacenterApplicationDescription.RESULT);
274 if (result.getIndirect() != true) {
275
276
277 WriterTarget target = (WriterTarget) querier.getQuery().getTarget();
278 StringWriter sw = (StringWriter) target.openWriter();
279 result.setValue(sw.toString() );
280 }
281
282 this.setStatus(Status.COMPLETED);
283 this.reportMessage(qs.toString());
284
285 } else if (state.equals(QueryState.UNKNOWN)) {
286 this.setStatus(Status.UNKNOWN);
287 this.reportMessage(qs.toString());
288
289 } else {
290 logger.fatal("Programming error - unknown state encountered" + state.toString());
291
292 this.setStatus(Status.UNKNOWN);
293 this.reportMessage("Unkown state encountered " + state.toString());
294 }
295
296 }
297
298
299 /*** overridden, to return instances of datacenter parameter adapter.
300 * @see org.astrogrid.applications.AbstractApplication#instantiateAdapter(org.astrogrid.applications.beans.v1.parameters.ParameterValue, org.astrogrid.applications.description.ParameterDescription, org.astrogrid.applications.parameter.indirect.IndirectParameterValue)
301 *
302 protected ParameterAdapter instantiateAdapter(ParameterValue arg0,
303 ParameterDescription arg1, ExternalValue arg2) {
304 return new DatacenterParameterAdapter(arg0, arg1, arg2);
305 }
306 */
307 }
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453