View Javadoc

1   /*$Id: DatacenterApplication.java,v 1.12 2004/11/11 20:42:50 mch Exp $
2    * Created on 12-Jul-2004
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid
7    * Software License version 1.2, a copy of which has been included
8    * with this distribution in the LICENSE.txt file.
9    *
10   **/
11  package org.astrogrid.datacenter.service.v06;
12  
13  import EDU.oswego.cs.dl.util.concurrent.Executor;
14  import java.io.IOException;
15  import java.io.StringWriter;
16  import javax.xml.parsers.ParserConfigurationException;
17  import org.apache.commons.logging.Log;
18  import org.apache.commons.logging.LogFactory;
19  import org.astrogrid.applications.AbstractApplication;
20  import org.astrogrid.applications.CeaException;
21  import org.astrogrid.applications.Status;
22  import org.astrogrid.applications.beans.v1.parameters.ParameterValue;
23  import org.astrogrid.applications.description.ApplicationInterface;
24  import org.astrogrid.applications.parameter.protocol.ProtocolLibrary;
25  import org.astrogrid.community.Account;
26  import org.astrogrid.datacenter.queriers.Querier;
27  import org.astrogrid.datacenter.queriers.QuerierListener;
28  import org.astrogrid.datacenter.queriers.status.QuerierStatus;
29  import org.astrogrid.datacenter.query.AdqlQueryMaker;
30  import org.astrogrid.datacenter.query.Query;
31  import org.astrogrid.datacenter.query.QueryState;
32  import org.astrogrid.datacenter.query.SimpleQueryMaker;
33  import org.astrogrid.datacenter.service.DataServer;
34  import org.astrogrid.slinger.targets.TargetIndicator;
35  import org.astrogrid.slinger.targets.TargetMaker;
36  import org.astrogrid.slinger.targets.WriterTarget;
37  import org.astrogrid.workflow.beans.v1.Tool;
38  import org.xml.sax.SAXException;
39  
40  /*** Represents a query running against the datacenter.
41   * <p />
42   * Datacenter already has a framework for starting asynchronous queries and then monitoring their progress.
43   * This class wraps the datacenter framework, mapping querier events into events in the CEA framework.
44   *
45   * There is one instance of this class for each query
46   *
47   * @author Noel Winstanley nw@jb.man.ac.uk 12-Jul-2004
48   *
49   */
50  public class DatacenterApplication extends AbstractApplication implements QuerierListener {
51     
52     /***
53      * Commons Logger for this class
54      */
55     private static final Log logger = LogFactory.getLog(DatacenterApplication.class);
56     
57     protected final Account acc;
58     /*** the datacenter system object - a static, which provides access to the datacenter query framework */
59     protected final DataServer serv;
60     /*** the executor for background tasks */
61     Executor exec;
62     
63     /*** id allocated by datacenter to this querier */
64     protected String querierID;
65     
66     public String getQuerierId() { return querierID;   }
67     
68     /*** Construct a new DatacetnerApplication
69      * @param arg0
70      * @param arg1
71      * @param arg2
72      * @param arg3
73      */
74     public DatacenterApplication(IDs ids, Tool tool, ApplicationInterface interf, ProtocolLibrary arg3,DataServer serv,Executor exec) {
75        super(ids, tool,interf, arg3);
76        this.serv = serv;
77        this.exec = exec;
78        this.acc = new Account(ids.getUser().getUserId(),ids.getUser().getCommunity(),ids.getUser().getToken());
79        logger.info("CEA DSA initialised, Job ID="+ids.getJobStepId());
80     }
81     
82     /*** construct a query from the contents of the tool
83      * (mch) not entirely happy with this following changes to Query that include
84      * the result spec.  I've set it to table here and hope that targets etc get
85      * set properly later (as it would have done before)
86      * @param t
87      * @return
88      */
89     protected final Query buildQuery(ApplicationInterface interf)  throws IOException, CeaException {
90  
91        if (interf.getName().equals(DatacenterApplicationDescription.CONE_IFACE)) {
92           return SimpleQueryMaker.makeConeQuery(
93              Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RA).process())
94                 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.DEC).process())
95                 , Double.parseDouble((String)findInputParameterAdapter(DatacenterApplicationDescription.RADIUS).process())
96           );
97        }
98        else if (interf.getName().equals(DatacenterApplicationDescription.ADQL_IFACE)) {
99           String querySource = findInputParameter(DatacenterApplicationDescription.QUERY).getValue();
100          String queryString = (String)findInputParameterAdapter(DatacenterApplicationDescription.QUERY).process();
101          if ((queryString == null) || (queryString.trim().length() == 0)) {
102             throw new IOException("Read empty string at "+querySource);
103          }
104          logger.debug("Query will be " + queryString);
105          try {
106             return AdqlQueryMaker.makeQuery(queryString);
107          }
108          catch (SAXException e) { throw new IllegalArgumentException(e+", reading Adql at "+querySource); }
109          catch (ParserConfigurationException e) { throw new CeaException("Server error:",e); }
110       }
111       else
112       {
113          // shouldn't get here - as would have barfed during 'initializeApplication' in DatacenterApplicatonDescription
114          logger.fatal("Programming logic error - unknown interface" + interf.getName());
115          throw new IllegalArgumentException("Programming logic error - unknown interface" + interf.getName());
116       }
117       
118    }
119    
120    
121    /***
122     * @see org.astrogrid.applications.Application#execute()
123     */
124    public boolean execute() throws CeaException {
125       createAdapters();
126       try {
127          ParameterValue resultTarget = findOutputParameter(DatacenterApplicationDescription.RESULT);
128          TargetIndicator ti = null;
129          if (resultTarget.getIndirect()==true) {
130             //results will go to the URI given in the parameter
131             if ((resultTarget.getValue() == null) || (resultTarget.getValue().trim().length() == 0)) {
132                throw new CeaException("ResultTarget is indirect but value is empty");
133             }
134             ti = TargetMaker.makeIndicator(resultTarget.getValue());
135          } else {
136             //direct-to-cea target, so results must get written to a string to be inserted into the
137             //parametervalue when complete (see queryStatusChanged)
138             ti = TargetMaker.makeIndicator(new StringWriter(), true); //close stream when finished writing to it
139          }
140 
141          String resultsFormat = (String)findInputParameterAdapter(DatacenterApplicationDescription.FORMAT).process();
142          Query query = buildQuery(getApplicationInterface());
143          query.getResultsDef().setTarget(ti);
144          query.getResultsDef().setFormat(resultsFormat);
145          Querier querier = Querier.makeQuerier(acc,query, this);
146          querier.addListener(this);
147          
148          setStatus(Status.INITIALIZED);
149          querierID = serv.submitQuerier(querier);
150          logger.info("assigned QuerierID " + querierID);
151          return true;
152       }
153       catch (Throwable e) {
154          reportError(e+" executing "+this.getTool().getName(),e);
155          if (e instanceof CeaException) {
156             throw (CeaException) e;
157          }
158          else {
159             throw new CeaException(e+", executing application "+this.getTool().getName(),e);
160          }
161       }
162    }
163    
164    /*** Implemented by calling abot on the querier object - so if the underlyng database back end supports abort, the cec does too
165     * @see org.astrogrid.applications.Application#attemptAbort()
166     */
167    public boolean attemptAbort() {
168       try {
169          QuerierStatus stat = serv.abortQuery(acc,querierID);
170          if (stat.getState().equals(QueryState.ABORTED)) {
171             return true;
172          } else {
173             return false; // assume this means we couldn't abort.
174          }
175       } catch (IOException e) {
176          logger.warn("Attempted abort failed with exception",e);
177          return false;
178       }
179    }
180    
181    
182    /*** callback method for our associated querier
183     * <p />Through this method, this class receives notifications in changes of state of the running query.
184     * These state changes and messages are propagated onto the cea framework,
185     * and results are grabbed from the query at the appropriate time.
186     * @see org.astrogrid.datacenter.queriers.QuerierListener#queryStatusChanged(org.astrogrid.datacenter.queriers.Querier)
187     */
188    public void queryStatusChanged(final Querier querier) {
189       QuerierStatus qs = querier.getStatus();
190       QueryState state = qs.getState();
191       logger.debug("CEA seen DSA state="+state);
192       // would be nice to use a switch here, but can't.
193       if (state.equals(QueryState.CONSTRUCTED) || state.equals(QueryState.QUEUED)) {
194          this.setStatus(Status.INITIALIZED);
195          
196       } else if (state.equals(QueryState.STARTING) || state.equals(QueryState.RUNNING_QUERY)) {
197          this.setStatus(Status.RUNNING);
198          
199       } else if (state.equals(QueryState.QUERY_COMPLETE)) {
200          this.reportMessage(qs.toString());
201          
202       } else if (state.equals(QueryState.RUNNING_RESULTS)) {
203          this.setStatus(Status.WRITINGBACK);
204          /* this is all handled by the target indicators and the 'finish' below
205           final ParameterAdapter result = (ParameterAdapter)outputParameterAdapters().next();
206           //necessary to perform write-back in separate thread - as we don't know what thread is calling this callback
207           // and it mustn't be the same one as is going to write out the output - otherwise we'll deadlock on the pipe.
208           Runnable worker = new Runnable() {
209           public void run() {
210           try {
211           CEATargetIndicator ti = (CEATargetIndicator)querier.getQuery().getTarget();
212           result.writeBack(ti);
213           setStatus(Status.COMPLETED); // now the application has completed..
214           } catch (CeaException e) {
215           reportError("Failed to write back parameter values",e);
216           }
217           }
218           };
219           try {
220           exec.execute(worker);
221           }catch (InterruptedException e) {
222           reportMessage("couldn't start worker thread to read results");
223           }
224           */
225       } else if (state.equals(QueryState.ABORTED)) {
226          this.reportMessage(qs.toString());
227          this.setStatus(Status.ERROR); // this is the convention.
228          
229       } else if (state.equals(QueryState.ERROR)) {
230          this.reportError(qs.toString()); // sets us in error state.
231          
232       } else if (state.equals(QueryState.FINISHED)) {
233          ParameterValue result = findOutputParameter(DatacenterApplicationDescription.RESULT);
234          if (result.getIndirect() != true) {
235             //if the results were to be directed to CEA, they will be stored in a StringWriter in
236             //the WriterTarget
237             WriterTarget target = (WriterTarget) querier.getQuery().getTarget();
238             StringWriter sw = (StringWriter) target.resolveWriter(acc);
239             result.setValue(sw.toString() );
240          }
241 
242          this.setStatus(Status.COMPLETED);
243          this.reportMessage(qs.toString());
244          
245       } else if (state.equals(QueryState.UNKNOWN)) {
246          this.setStatus(Status.UNKNOWN);
247          this.reportMessage(qs.toString());
248          
249       } else {
250          logger.fatal("Programming error - unknown state encountered" + state.toString());
251          // would like to throw, but won't - as don't know who called me.
252          this.setStatus(Status.UNKNOWN);
253          this.reportMessage("Unkown state encountered " + state.toString());
254       }
255       
256    }
257    
258    
259    /*** overridden, to return instances of datacenter parameter adapter.
260     * @see org.astrogrid.applications.AbstractApplication#instantiateAdapter(org.astrogrid.applications.beans.v1.parameters.ParameterValue, org.astrogrid.applications.description.ParameterDescription, org.astrogrid.applications.parameter.indirect.IndirectParameterValue)
261     *
262    protected ParameterAdapter instantiateAdapter(ParameterValue arg0,
263                                                  ParameterDescription arg1, ExternalValue arg2) {
264       return new DatacenterParameterAdapter(arg0, arg1, arg2);
265    }
266     */
267 }
268 
269 
270 /*
271  $Log: DatacenterApplication.java,v $
272  Revision 1.12  2004/11/11 20:42:50  mch
273  Fixes to Vizier plugin, introduced SkyNode, started SssImagePlugin
274 
275  Revision 1.11  2004/11/10 22:01:50  mch
276  skynode starts and some fixes
277 
278  Revision 1.10  2004/11/09 17:42:22  mch
279  Fixes to tests after fixes for demos, incl adding closable to targetIndicators
280 
281  Revision 1.9  2004/11/08 02:58:44  mch
282  Various fixes and better error messages
283 
284  Revision 1.8  2004/11/03 00:17:56  mch
285  PAL_MCH Candidate 2 merge
286 
287  Revision 1.3.8.4  2004/11/02 21:51:54  mch
288  Replaced AgslTarget with UrlTarget and MySpaceTarget
289 
290  Revision 1.3.8.3  2004/11/02 19:48:43  mch
291  Split TargetIndicator to indicator and maker
292 
293  Revision 1.3.8.2  2004/11/02 19:41:26  mch
294  Split TargetIndicator to indicator and maker
295 
296  Revision 1.3.8.1  2004/10/20 18:12:45  mch
297  CEA fixes, resource tests and fixes, minor navigation changes
298 
299  Revision 1.4.2.1  2004/10/20 12:43:28  mch
300  Fixes to CEA interface to write directly to target
301 
302  Revision 1.4  2004/10/20 08:10:55  pah
303  taken account of new backend phase and tidied up some logging
304 
305  Revision 1.3  2004/10/07 10:34:44  mch
306  Fixes to Cone maker functions and reading/writing String comparisons from Query
307 
308  Revision 1.2  2004/10/06 21:12:17  mch
309  Big Lump of changes to pass Query OM around instead of Query subclasses, and TargetIndicator mixed into Slinger
310 
311  Revision 1.1  2004/09/28 15:02:13  mch
312  Merged PAL and server packages
313 
314  Revision 1.8  2004/09/17 01:27:21  nw
315  added thread management.
316 
317  Revision 1.7  2004/09/06 15:20:47  mch
318  Added logger message for execute()
319 
320  Revision 1.6  2004/08/25 23:38:34  mch
321  (Days changes) moved many query- and results- related classes, renamed packages, added tests, added CIRCLE to sql/adql parsers
322 
323  Revision 1.5  2004/08/17 20:19:36  mch
324  Moved TargetIndicator to client
325 
326  Revision 1.4  2004/07/27 13:48:33  nw
327  renamed indirect package to protocol,
328  renamed classes and methods within protocol package
329 
330  Revision 1.3  2004/07/22 16:31:22  nw
331  cleaned up application / parameter adapter interface.
332 
333  Revision 1.2  2004/07/20 02:14:48  nw
334  final implementaiton of itn06 Datacenter CEA interface
335 
336  Revision 1.1  2004/07/13 17:11:09  nw
337  first draft of an itn06 CEA implementation for datacenter
338  
339  */
340