View Javadoc

1   /*$Id: DatacenterApplication.java,v 1.2 2005/03/21 18:45:55 mch Exp $
2    * Created on 12-Jul-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid
7    * Software License version 1.2, a copy of which has been included
8    * with this distribution in the LICENSE.txt file.
9    *
10   **/
11  package org.astrogrid.dataservice.service.cea;
12  
13  import EDU.oswego.cs.dl.util.concurrent.Executor;
14  import java.io.IOException;
15  import java.io.StringWriter;
16  import java.net.URISyntaxException;
17  import java.security.Principal;
18  import javax.xml.parsers.ParserConfigurationException;
19  import org.apache.commons.logging.Log;
20  import org.apache.commons.logging.LogFactory;
21  import org.astrogrid.account.LoginAccount;
22  import org.astrogrid.applications.AbstractApplication;
23  import org.astrogrid.applications.CeaException;
24  import org.astrogrid.applications.Status;
25  import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
26  import org.astrogrid.applications.description.ApplicationInterface;
27  import org.astrogrid.applications.parameter.protocol.ProtocolLibrary;
28  import org.astrogrid.dataservice.queriers.Querier;
29  import org.astrogrid.dataservice.queriers.QuerierListener;
30  import org.astrogrid.dataservice.queriers.status.QuerierStatus;
31  import org.astrogrid.dataservice.service.AxisDataServer;
32  import org.astrogrid.dataservice.service.DataServer;
33  import org.astrogrid.query.Query;
34  import org.astrogrid.query.QueryState;
35  import org.astrogrid.query.SimpleQueryMaker;
36  import org.astrogrid.query.adql.AdqlQueryMaker;
37  import org.astrogrid.slinger.mime.MimeNames;
38  import org.astrogrid.slinger.targets.TargetIdentifier;
39  import org.astrogrid.slinger.targets.TargetMaker;
40  import org.astrogrid.slinger.targets.WriterTarget;
41  import org.astrogrid.slinger.vospace.HomespaceName;
42  import org.astrogrid.slinger.vospace.IVOSRN;
43  import org.astrogrid.workflow.beans.v1.Tool;
44  import org.xml.sax.SAXException;
45  
46  /*** Represents a query running against the datacenter.
47   * <p />
48   * Datacenter already has a framework for starting asynchronous queries and then monitoring their progress.
49   * This class wraps the datacenter framework, mapping querier events into events in the CEA framework.
50   *
51   * There is one instance of this class for each query
52   *
53   * @author Noel Winstanley nw@jb.man.ac.uk 12-Jul-2004
54   *
55   */
56  public class DatacenterApplication extends AbstractApplication implements QuerierListener {
57     
58     /***
59      * Commons Logger for this class
60      */
61     private static final Log logger = LogFactory.getLog(DatacenterApplication.class);
62     
63     protected final Principal acc;
64     /*** the datacenter system object - a static, which provides access to the datacenter query framework */
65     protected final DataServer serv;
66     /*** the executor for background tasks */
67     Executor exec;
68     
69     /*** id allocated by datacenter to this querier */
70     protected String querierID;
71     
72     public String getQuerierId() { return querierID;   }
73     
74     /*** Construct a new DatacetnerApplication
75      * @param arg0
76      * @param arg1
77      * @param arg2
78      * @param arg3
79      */
80     public DatacenterApplication(IDs ids, Tool tool, ApplicationInterface interf, ProtocolLibrary arg3,DataServer serv,Executor exec) {
81        super(ids, tool,interf, arg3);
82        this.serv = serv;
83        this.exec = exec;
84        this.acc = new LoginAccount(ids.getUser().getUserId(),ids.getUser().getCommunity());
85        logger.info("CEA DSA initialised, Job ID="+ids.getJobStepId());
86     }
87     
88     /*** construct a query from the contents of the tool
89      * (mch) not entirely happy with this following changes to Query that include
90      * the result spec.  I've set it to table here and hope that targets etc get
91      * set properly later (as it would have done before)
92      * @param t
93      * @return
94      */
95     protected final Query buildQuery(ApplicationInterface interf)  throws IOException, CeaException {
96  
97        if (interf.getName().equals(DatacenterApplicationDescription.CONE_IFACE)) {
98           return SimpleQueryMaker.makeConeQuery(
99              Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RA).process())
100                , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.DEC).process())
101                , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RADIUS).process())
102          );
103       }
104       else if (interf.getName().equals(DatacenterApplicationDescription.ADQL_IFACE)) {
105          String querySource = findInputParameter(DatacenterApplicationDescription.QUERY).getValue();
106          String queryString = (String)findInputParameterAdapter(DatacenterApplicationDescription.QUERY).process();
107          if ((queryString == null) || (queryString.trim().length() == 0)) {
108             throw new IOException("Read empty string at "+querySource);
109          }
110          logger.debug("Query will be " + queryString);
111          try {
112             return AdqlQueryMaker.makeQuery(queryString);
113          }
114          catch (SAXException e) {
115             throw new IllegalArgumentException(e+", reading Adql at "+querySource);
116          }
117       }
118       else
119       {
120          // shouldn't get here - as would have barfed during 'initializeApplication' in DatacenterApplicatonDescription
121          logger.fatal("Programming logic error - unknown interface" + interf.getName());
122          throw new IllegalArgumentException("Programming logic error - unknown interface" + interf.getName());
123       }
124       
125    }
126    
127    
128    /***
129     * @see org.astrogrid.applications.Application#execute()
130     */
131    public boolean execute() throws CeaException {
132       createAdapters();
133       try {
134          ParameterValue resultTarget = findOutputParameter(DatacenterApplicationDescription.RESULT);
135          TargetIdentifier ti = null;
136          if (resultTarget.getIndirect()==true) {
137             //results will go to the URI given in the parameter
138             if ((resultTarget.getValue() == null) || (resultTarget.getValue().trim().length() == 0)) {
139                throw new CeaException("ResultTarget is indirect but value is empty");
140             }
141             
142             String targetUri = transformTarget(resultTarget.getValue()); //special botch for old vs new homespaces etc
143             
144             ti = TargetMaker.makeTarget(targetUri);
145          } else {
146             //direct-to-cea target, so results must get written to a string to be inserted into the
147             //parametervalue when complete (see queryStatusChanged)
148             ti = TargetMaker.makeTarget(new StringWriter(), true); //close stream when finished writing to it
149          }
150 
151          String resultsFormat = MimeNames.getMimeType( findInputParameterAdapter(DatacenterApplicationDescription.FORMAT).process().toString());
152          
153          Query query = buildQuery(getApplicationInterface());
154          query.getResultsDef().setTarget(ti);
155          query.getResultsDef().setFormat(resultsFormat);
156          Querier querier = Querier.makeQuerier(acc,query, "CEA from "+AxisDataServer.getSource()+" [Job "+ids.getJobStepId()+"]");
157          querier.addListener(this);
158          
159          setStatus(Status.INITIALIZED);
160          querierID = serv.submitQuerier(querier);
161          logger.info("assigned QuerierID " + querierID);
162          return true;
163       }
164       catch (Throwable e) {
165          reportError(e+" executing "+this.getTool().getName(),e);
166          if (e instanceof CeaException) {
167             throw (CeaException) e;
168          }
169          else {
170             throw new CeaException(e+", executing application "+this.getTool().getName(),e);
171          }
172       }
173    }
174 
175    /***
176     * If the incoming URI describing the target location is an IVORN, it might be
177     * an account or it might be a real IVORN.  Examines it to see if it is of the
178     * form of an account (ie ivo://community/individual) and if so, checks to see
179     * if the Registry can resolve it.  If it is of the right form, and the registry
180     * cannot resolve it, turns it into a homespacename
181     */
182    public String transformTarget(String targetUri) throws IllegalArgumentException, URISyntaxException {
183       if (!IVOSRN.isIvorn(targetUri)) { return targetUri; } //some other URI
184 
185       int hashIdx = targetUri.indexOf("#");
186       if (hashIdx == -1) {
187          throw new IllegalArgumentException("Target URI "+targetUri+" has no path given");
188       }
189       String key = targetUri.substring(0,hashIdx).substring(6);
190       int slashIdx = key.indexOf("/");
191       if ((slashIdx != -1) && (slashIdx == key.lastIndexOf("/"))) { //if there is only one slash
192          
193          //check it can't be resolved
194          try {
195             new IVOSRN(targetUri).resolve();
196             //it resolved OK, so it's a real IVORN.  Leave unchanged
197             return targetUri;
198          }
199          catch (IOException rre) { } //carry on, it's not resolvable (normally), so...
200          
201          //assume any exception means it can't be found.  Can't do much better than that at the mo anyway
202          String ind = key.substring(slashIdx+1); //individual
203          String com = key.substring(0,slashIdx); //community
204          HomespaceName homespace = new HomespaceName(ind+"@"+com, targetUri.substring(hashIdx+1));
205          logger.info("Converting incoming '"+targetUri+"' to '"+homespace+"'");
206          return homespace.toURI();
207       }
208       return targetUri;
209    }
210          
211    /*** Implemented by calling abot on the querier object - so if the underlyng database back end supports abort, the cec does too
212     * @see org.astrogrid.applications.Application#attemptAbort()
213     */
214    public boolean attemptAbort() {
215       try {
216          QuerierStatus stat = serv.abortQuery(acc,querierID);
217          if (stat.getState().equals(QueryState.ABORTED)) {
218             return true;
219          } else {
220             return false; // assume this means we couldn't abort.
221          }
222       } catch (IOException e) {
223          logger.warn("Attempted abort failed with exception",e);
224          return false;
225       }
226    }
227    
228    
229    /*** callback method for our associated querier
230     * <p />Through this method, this class receives notifications in changes of state of the running query.
231     * These state changes and messages are propagated onto the cea framework,
232     * and results are grabbed from the query at the appropriate time.
233     * @see org.astrogrid.datacenter.queriers.QuerierListener#queryStatusChanged(org.astrogrid.datacenter.queriers.Querier)
234     */
235    public void queryStatusChanged(final Querier querier) {
236       QuerierStatus qs = querier.getStatus();
237       QueryState state = qs.getState();
238       logger.debug("CEA seen DSA state="+state);
239       // would be nice to use a switch here, but can't.
240       if (state.equals(QueryState.CONSTRUCTED) || state.equals(QueryState.QUEUED)) {
241          this.setStatus(Status.INITIALIZED);
242          
243       } else if (state.equals(QueryState.STARTING) || state.equals(QueryState.RUNNING_QUERY)) {
244          this.setStatus(Status.RUNNING);
245          
246       } else if (state.equals(QueryState.QUERY_COMPLETE)) {
247          this.reportMessage(qs.toString());
248          
249       } else if (state.equals(QueryState.RUNNING_RESULTS)) {
250          this.setStatus(Status.WRITINGBACK);
251          /* this is all handled by the target indicators and the 'finish' below
252           final ParameterAdapter result = (ParameterAdapter)outputParameterAdapters().next();
253           //necessary to perform write-back in separate thread - as we don't know what thread is calling this callback
254           // and it mustn't be the same one as is going to write out the output - otherwise we'll deadlock on the pipe.
255           Runnable worker = new Runnable() {
256           public void run() {
257           try {
258           CEATargetIndicator ti = (CEATargetIndicator)querier.getQuery().getTarget();
259           result.writeBack(ti);
260           setStatus(Status.COMPLETED); // now the application has completed..
261           } catch (CeaException e) {
262           reportError("Failed to write back parameter values",e);
263           }
264           }
265           };
266           try {
267           exec.execute(worker);
268           }catch (InterruptedException e) {
269           reportMessage("couldn't start worker thread to read results");
270           }
271           */
272       } else if (state.equals(QueryState.ABORTED)) {
273          this.reportMessage(qs.toString());
274          this.setStatus(Status.ERROR); // this is the convention.
275          
276       } else if (state.equals(QueryState.ERROR)) {
277          this.reportError(qs.toString()); // sets us in error state.
278          
279       } else if (state.equals(QueryState.FINISHED)) {
280          ParameterValue result = findOutputParameter(DatacenterApplicationDescription.RESULT);
281          if (result.getIndirect() != true) {
282             //if the results were to be directed to CEA, they will be stored in a StringWriter in
283             //the WriterTarget
284             WriterTarget target = (WriterTarget) querier.getQuery().getTarget();
285             StringWriter sw = (StringWriter) target.resolveWriter(acc);
286             result.setValue(sw.toString() );
287          }
288 
289          this.setStatus(Status.COMPLETED);
290          this.reportMessage(qs.toString());
291          
292       } else if (state.equals(QueryState.UNKNOWN)) {
293          this.setStatus(Status.UNKNOWN);
294          this.reportMessage(qs.toString());
295          
296       } else {
297          logger.fatal("Programming error - unknown state encountered" + state.toString());
298          // would like to throw, but won't - as don't know who called me.
299          this.setStatus(Status.UNKNOWN);
300          this.reportMessage("Unkown state encountered " + state.toString());
301       }
302       
303    }
304    
305    
306    /*** overridden, to return instances of datacenter parameter adapter.
307     * @see org.astrogrid.applications.AbstractApplication#instantiateAdapter(org.astrogrid.applications.beans.v1.parameters.ParameterValue, org.astrogrid.applications.description.ParameterDescription, org.astrogrid.applications.parameter.indirect.IndirectParameterValue)
308     *
309    protected ParameterAdapter instantiateAdapter(ParameterValue arg0,
310                                                  ParameterDescription arg1, ExternalValue arg2) {
311       return new DatacenterParameterAdapter(arg0, arg1, arg2);
312    }
313     */
314 }
315 
316 
317 /*
318  $Log: DatacenterApplication.java,v $
319  Revision 1.2  2005/03/21 18:45:55  mch
320  Naughty big lump of changes
321 
322  Revision 1.1.1.1  2005/02/17 18:37:35  mch
323  Initial checkin
324 
325  Revision 1.1.1.1  2005/02/16 17:11:24  mch
326  Initial checkin
327 
328  Revision 1.12.2.11  2005/02/11 15:58:27  mch
329  Some fixes before split
330 
331  Revision 1.12.2.10  2004/12/08 18:36:40  mch
332  Added Vizier, rationalised SqlWriters etc, separated out TableResults from QueryResults
333 
334  Revision 1.12.2.9  2004/12/07 21:21:09  mch
335  Fixes after a days integration testing
336 
337  Revision 1.12.2.8  2004/12/07 00:02:17  mch
338  added tentative ivorn->homespace converter
339 
340  Revision 1.12.2.7  2004/11/30 01:04:02  mch
341  Rationalised tablewriters, reverted AxisDataService06 to string
342 
343  Revision 1.12.2.6  2004/11/29 21:52:18  mch
344  Fixes to skynode, log.error(), getstem, status logger, etc following tests on grendel
345 
346  Revision 1.12.2.5  2004/11/25 08:29:41  mch
347  added table writers modelled on STIL
348 
349  Revision 1.12.2.4  2004/11/23 17:46:52  mch
350  Fixes etc
351 
352  Revision 1.12.2.3  2004/11/23 12:34:01  mch
353  renamed to makeTarget
354 
355  Revision 1.12.2.2  2004/11/23 11:55:06  mch
356  renamved makeTarget methods
357 
358  Revision 1.12.2.1  2004/11/22 00:57:16  mch
359  New interfaces for SIAP etc and new slinger package
360 
361  Revision 1.12  2004/11/11 20:42:50  mch
362  Fixes to Vizier plugin, introduced SkyNode, started SssImagePlugin
363 
364  Revision 1.11  2004/11/10 22:01:50  mch
365  skynode starts and some fixes
366 
367  Revision 1.10  2004/11/09 17:42:22  mch
368  Fixes to tests after fixes for demos, incl adding closable to targetIndicators
369 
370  Revision 1.9  2004/11/08 02:58:44  mch
371  Various fixes and better error messages
372 
373  Revision 1.8  2004/11/03 00:17:56  mch
374  PAL_MCH Candidate 2 merge
375 
376  Revision 1.3.8.4  2004/11/02 21:51:54  mch
377  Replaced AgslTarget with UrlTarget and MySpaceTarget
378 
379  Revision 1.3.8.3  2004/11/02 19:48:43  mch
380  Split TargetIndicator to indicator and maker
381 
382  Revision 1.3.8.2  2004/11/02 19:41:26  mch
383  Split TargetIndicator to indicator and maker
384 
385  Revision 1.3.8.1  2004/10/20 18:12:45  mch
386  CEA fixes, resource tests and fixes, minor navigation changes
387 
388  Revision 1.4.2.1  2004/10/20 12:43:28  mch
389  Fixes to CEA interface to write directly to target
390 
391  Revision 1.4  2004/10/20 08:10:55  pah
392  taken Principal of new backend phase and tidied up some logging
393 
394  Revision 1.3  2004/10/07 10:34:44  mch
395  Fixes to Cone maker functions and reading/writing String comparisons from Query
396 
397  Revision 1.2  2004/10/06 21:12:17  mch
398  Big Lump of changes to pass Query OM around instead of Query subclasses, and TargetIndicator mixed into Slinger
399 
400  Revision 1.1  2004/09/28 15:02:13  mch
401  Merged PAL and server packages
402 
403  Revision 1.8  2004/09/17 01:27:21  nw
404  added thread management.
405 
406  Revision 1.7  2004/09/06 15:20:47  mch
407  Added logger message for execute()
408 
409  Revision 1.6  2004/08/25 23:38:34  mch
410  (Days changes) moved many query- and results- related classes, renamed packages, added tests, added CIRCLE to sql/adql parsers
411 
412  Revision 1.5  2004/08/17 20:19:36  mch
413  Moved TargetIndicator to client
414 
415  Revision 1.4  2004/07/27 13:48:33  nw
416  renamed indirect package to protocol,
417  renamed classes and methods within protocol package
418 
419  Revision 1.3  2004/07/22 16:31:22  nw
420  cleaned up application / parameter adapter interface.
421 
422  Revision 1.2  2004/07/20 02:14:48  nw
423  final implementaiton of itn06 Datacenter CEA interface
424 
425  Revision 1.1  2004/07/13 17:11:09  nw
426  first draft of an itn06 CEA implementation for datacenter
427  
428  */
429