1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.dataservice.service.cea;
12
13 import EDU.oswego.cs.dl.util.concurrent.Executor;
14 import java.io.IOException;
15 import java.io.StringWriter;
16 import java.net.URISyntaxException;
17 import java.security.Principal;
18 import javax.xml.parsers.ParserConfigurationException;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.astrogrid.account.LoginAccount;
22 import org.astrogrid.applications.AbstractApplication;
23 import org.astrogrid.applications.CeaException;
24 import org.astrogrid.applications.Status;
25 import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
26 import org.astrogrid.applications.description.ApplicationInterface;
27 import org.astrogrid.applications.parameter.protocol.ProtocolLibrary;
28 import org.astrogrid.dataservice.queriers.Querier;
29 import org.astrogrid.dataservice.queriers.QuerierListener;
30 import org.astrogrid.dataservice.queriers.status.QuerierStatus;
31 import org.astrogrid.dataservice.service.AxisDataServer;
32 import org.astrogrid.dataservice.service.DataServer;
33 import org.astrogrid.query.Query;
34 import org.astrogrid.query.QueryState;
35 import org.astrogrid.query.SimpleQueryMaker;
36 import org.astrogrid.query.adql.AdqlQueryMaker;
37 import org.astrogrid.slinger.mime.MimeNames;
38 import org.astrogrid.slinger.targets.TargetIdentifier;
39 import org.astrogrid.slinger.targets.TargetMaker;
40 import org.astrogrid.slinger.targets.WriterTarget;
41 import org.astrogrid.slinger.vospace.HomespaceName;
42 import org.astrogrid.slinger.vospace.IVOSRN;
43 import org.astrogrid.workflow.beans.v1.Tool;
44 import org.xml.sax.SAXException;
45
46 /*** Represents a query running against the datacenter.
47 * <p />
48 * Datacenter already has a framework for starting asynchronous queries and then monitoring their progress.
49 * This class wraps the datacenter framework, mapping querier events into events in the CEA framework.
50 *
51 * There is one instance of this class for each query
52 *
53 * @author Noel Winstanley nw@jb.man.ac.uk 12-Jul-2004
54 *
55 */
56 public class DatacenterApplication extends AbstractApplication implements QuerierListener {
57
58 /***
59 * Commons Logger for this class
60 */
61 private static final Log logger = LogFactory.getLog(DatacenterApplication.class);
62
63 protected final Principal acc;
64 /*** the datacenter system object - a static, which provides access to the datacenter query framework */
65 protected final DataServer serv;
66 /*** the executor for background tasks */
67 Executor exec;
68
69 /*** id allocated by datacenter to this querier */
70 protected String querierID;
71
72 public String getQuerierId() { return querierID; }
73
74 /*** Construct a new DatacetnerApplication
75 * @param arg0
76 * @param arg1
77 * @param arg2
78 * @param arg3
79 */
80 public DatacenterApplication(IDs ids, Tool tool, ApplicationInterface interf, ProtocolLibrary arg3,DataServer serv,Executor exec) {
81 super(ids, tool,interf, arg3);
82 this.serv = serv;
83 this.exec = exec;
84 this.acc = new LoginAccount(ids.getUser().getUserId(),ids.getUser().getCommunity());
85 logger.info("CEA DSA initialised, Job ID="+ids.getJobStepId());
86 }
87
88 /*** construct a query from the contents of the tool
89 * (mch) not entirely happy with this following changes to Query that include
90 * the result spec. I've set it to table here and hope that targets etc get
91 * set properly later (as it would have done before)
92 * @param t
93 * @return
94 */
95 protected final Query buildQuery(ApplicationInterface interf) throws IOException, CeaException {
96
97 if (interf.getName().equals(DatacenterApplicationDescription.CONE_IFACE)) {
98 return SimpleQueryMaker.makeConeQuery(
99 Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RA).process())
100 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.DEC).process())
101 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RADIUS).process())
102 );
103 }
104 else if (interf.getName().equals(DatacenterApplicationDescription.ADQL_IFACE)) {
105 String querySource = findInputParameter(DatacenterApplicationDescription.QUERY).getValue();
106 String queryString = (String)findInputParameterAdapter(DatacenterApplicationDescription.QUERY).process();
107 if ((queryString == null) || (queryString.trim().length() == 0)) {
108 throw new IOException("Read empty string at "+querySource);
109 }
110 logger.debug("Query will be " + queryString);
111 try {
112 return AdqlQueryMaker.makeQuery(queryString);
113 }
114 catch (SAXException e) {
115 throw new IllegalArgumentException(e+", reading Adql at "+querySource);
116 }
117 }
118 else
119 {
120
121 logger.fatal("Programming logic error - unknown interface" + interf.getName());
122 throw new IllegalArgumentException("Programming logic error - unknown interface" + interf.getName());
123 }
124
125 }
126
127
128 /***
129 * @see org.astrogrid.applications.Application#execute()
130 */
131 public boolean execute() throws CeaException {
132 createAdapters();
133 try {
134 ParameterValue resultTarget = findOutputParameter(DatacenterApplicationDescription.RESULT);
135 TargetIdentifier ti = null;
136 if (resultTarget.getIndirect()==true) {
137
138 if ((resultTarget.getValue() == null) || (resultTarget.getValue().trim().length() == 0)) {
139 throw new CeaException("ResultTarget is indirect but value is empty");
140 }
141
142 String targetUri = transformTarget(resultTarget.getValue());
143
144 ti = TargetMaker.makeTarget(targetUri);
145 } else {
146
147
148 ti = TargetMaker.makeTarget(new StringWriter(), true);
149 }
150
151 String resultsFormat = MimeNames.getMimeType( findInputParameterAdapter(DatacenterApplicationDescription.FORMAT).process().toString());
152
153 Query query = buildQuery(getApplicationInterface());
154 query.getResultsDef().setTarget(ti);
155 query.getResultsDef().setFormat(resultsFormat);
156 Querier querier = Querier.makeQuerier(acc,query, "CEA from "+AxisDataServer.getSource()+" [Job "+ids.getJobStepId()+"]");
157 querier.addListener(this);
158
159 setStatus(Status.INITIALIZED);
160 querierID = serv.submitQuerier(querier);
161 logger.info("assigned QuerierID " + querierID);
162 return true;
163 }
164 catch (Throwable e) {
165 reportError(e+" executing "+this.getTool().getName(),e);
166 if (e instanceof CeaException) {
167 throw (CeaException) e;
168 }
169 else {
170 throw new CeaException(e+", executing application "+this.getTool().getName(),e);
171 }
172 }
173 }
174
175 /***
176 * If the incoming URI describing the target location is an IVORN, it might be
177 * an account or it might be a real IVORN. Examines it to see if it is of the
178 * form of an account (ie ivo://community/individual) and if so, checks to see
179 * if the Registry can resolve it. If it is of the right form, and the registry
180 * cannot resolve it, turns it into a homespacename
181 */
182 public String transformTarget(String targetUri) throws IllegalArgumentException, URISyntaxException {
183 if (!IVOSRN.isIvorn(targetUri)) { return targetUri; }
184
185 int hashIdx = targetUri.indexOf("#");
186 if (hashIdx == -1) {
187 throw new IllegalArgumentException("Target URI "+targetUri+" has no path given");
188 }
189 String key = targetUri.substring(0,hashIdx).substring(6);
190 int slashIdx = key.indexOf("/");
191 if ((slashIdx != -1) && (slashIdx == key.lastIndexOf("/"))) {
192
193
194 try {
195 new IVOSRN(targetUri).resolve();
196
197 return targetUri;
198 }
199 catch (IOException rre) { }
200
201
202 String ind = key.substring(slashIdx+1);
203 String com = key.substring(0,slashIdx);
204 HomespaceName homespace = new HomespaceName(ind+"@"+com, targetUri.substring(hashIdx+1));
205 logger.info("Converting incoming '"+targetUri+"' to '"+homespace+"'");
206 return homespace.toURI();
207 }
208 return targetUri;
209 }
210
211 /*** Implemented by calling abot on the querier object - so if the underlyng database back end supports abort, the cec does too
212 * @see org.astrogrid.applications.Application#attemptAbort()
213 */
214 public boolean attemptAbort() {
215 try {
216 QuerierStatus stat = serv.abortQuery(acc,querierID);
217 if (stat.getState().equals(QueryState.ABORTED)) {
218 return true;
219 } else {
220 return false;
221 }
222 } catch (IOException e) {
223 logger.warn("Attempted abort failed with exception",e);
224 return false;
225 }
226 }
227
228
229 /*** callback method for our associated querier
230 * <p />Through this method, this class receives notifications in changes of state of the running query.
231 * These state changes and messages are propagated onto the cea framework,
232 * and results are grabbed from the query at the appropriate time.
233 * @see org.astrogrid.datacenter.queriers.QuerierListener#queryStatusChanged(org.astrogrid.datacenter.queriers.Querier)
234 */
235 public void queryStatusChanged(final Querier querier) {
236 QuerierStatus qs = querier.getStatus();
237 QueryState state = qs.getState();
238 logger.debug("CEA seen DSA state="+state);
239
240 if (state.equals(QueryState.CONSTRUCTED) || state.equals(QueryState.QUEUED)) {
241 this.setStatus(Status.INITIALIZED);
242
243 } else if (state.equals(QueryState.STARTING) || state.equals(QueryState.RUNNING_QUERY)) {
244 this.setStatus(Status.RUNNING);
245
246 } else if (state.equals(QueryState.QUERY_COMPLETE)) {
247 this.reportMessage(qs.toString());
248
249 } else if (state.equals(QueryState.RUNNING_RESULTS)) {
250 this.setStatus(Status.WRITINGBACK);
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272 } else if (state.equals(QueryState.ABORTED)) {
273 this.reportMessage(qs.toString());
274 this.setStatus(Status.ERROR);
275
276 } else if (state.equals(QueryState.ERROR)) {
277 this.reportError(qs.toString());
278
279 } else if (state.equals(QueryState.FINISHED)) {
280 ParameterValue result = findOutputParameter(DatacenterApplicationDescription.RESULT);
281 if (result.getIndirect() != true) {
282
283
284 WriterTarget target = (WriterTarget) querier.getQuery().getTarget();
285 StringWriter sw = (StringWriter) target.resolveWriter(acc);
286 result.setValue(sw.toString() );
287 }
288
289 this.setStatus(Status.COMPLETED);
290 this.reportMessage(qs.toString());
291
292 } else if (state.equals(QueryState.UNKNOWN)) {
293 this.setStatus(Status.UNKNOWN);
294 this.reportMessage(qs.toString());
295
296 } else {
297 logger.fatal("Programming error - unknown state encountered" + state.toString());
298
299 this.setStatus(Status.UNKNOWN);
300 this.reportMessage("Unkown state encountered " + state.toString());
301 }
302
303 }
304
305
306 /*** overridden, to return instances of datacenter parameter adapter.
307 * @see org.astrogrid.applications.AbstractApplication#instantiateAdapter(org.astrogrid.applications.beans.v1.parameters.ParameterValue, org.astrogrid.applications.description.ParameterDescription, org.astrogrid.applications.parameter.indirect.IndirectParameterValue)
308 *
309 protected ParameterAdapter instantiateAdapter(ParameterValue arg0,
310 ParameterDescription arg1, ExternalValue arg2) {
311 return new DatacenterParameterAdapter(arg0, arg1, arg2);
312 }
313 */
314 }
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429