View Javadoc

1   /*
2    * $Id: GdsQueryDelegate.java,v 1.17 2004/03/29 07:36:03 kea Exp $
3    *
4    * (C) Copyright Astrogrid...
5    */
6   
7   package org.astrogrid.warehouse.ogsadai;
8   
9   import org.astrogrid.warehouse.ogsadai.XSLTransform;
10  
11  import org.w3c.dom.Document;
12  import org.w3c.dom.Element;
13  import org.w3c.dom.Node;
14  import org.w3c.dom.NodeList;
15  import org.w3c.dom.NamedNodeMap;
16  import org.apache.log4j.Logger;
17  
18  import org.apache.xerces.parsers.DOMParser;
19  import org.xml.sax.InputSource;
20  import org.xml.sax.SAXException;
21  
22  import java.io.IOException;
23  import java.io.ByteArrayOutputStream;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.io.FileOutputStream;
27  import java.io.StringReader;
28  import org.globus.ogsa.utils.AnyHelper;
29  import org.globus.ogsa.GridServiceException;
30  import org.gridforum.ogsi.ExtendedDateTimeType;
31  import org.gridforum.ogsi.TerminationTimeType;
32  import org.gridforum.ogsi.ExtensibilityType;
33  import org.apache.axis.AxisFault;
34  
35  import uk.org.ogsadai.client.toolkit.ActivityRequest;
36  import uk.org.ogsadai.client.toolkit.ActivityOutput;
37  import uk.org.ogsadai.client.toolkit.GridDataService;
38  import uk.org.ogsadai.client.toolkit.GridDataServiceFactory;
39  import uk.org.ogsadai.client.toolkit.MessageLevelSecurityProperty;
40  import uk.org.ogsadai.client.toolkit.Response;
41  import uk.org.ogsadai.client.toolkit.ServiceFetcher;
42  import uk.org.ogsadai.client.toolkit.ServiceGroupRegistry;
43  import uk.org.ogsadai.client.toolkit.GridServiceMetaData;
44  import uk.org.ogsadai.client.toolkit.activity.sql.SQLQuery;
45  import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverToGFTP;
46  import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverToURL;
47  import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverFromURL;
48  import uk.org.ogsadai.service.OGSADAIConstants;
49  
50  import org.ietf.jgss.GSSCredential;
51  import org.globus.ogsa.utils.QueryHelper;
52  import org.globus.util.GlobusURL;
53  
54  import java.util.Calendar;
55  import java.util.TimeZone;
56                                                                                  
57  /***
58   * A high-level delegate for invoking a query on an OGSA-DAI Grid Data Service.
59   * 
60   * This class uses the OGDA-DAI client toolkit, introduced in OGSA-DAI 3.1,
61   * to interact with the OGSA-DAI GDS.  The toolkit abstracts away most of
62   * the detail of the underlying service, which is good as the whole grid 
63   * infrastructure is about to change out from under us.
64   *
65   * @author K Andrews
66   */
67  public class GdsQueryDelegate 
68  {
69    static Logger logger = Logger.getLogger("GdsQueryLogger");
70  
71    /***
72     * Default empty constructor.
73     * 
74     * @throws IOException
75     * @throws SAXException
76     */
77    public GdsQueryDelegate() throws IOException, SAXException 
78    {
79      super();    //Can throw IOException and SAXException
80    }
81  
82    /***
83     * Uses an OGSA-DAI Grid Data Service to perform the supplied SQL query.
84     *
85     * @param sql String containing the SQL query to be performed
86     *
87     * @param registryUrlString String containing the URL of the DAI Registry
88     *  to be used to find a GDS factory
89     *
90     * @param outputUrl String containing the URL to which results should be
91     * returned;  can be a file:// or gsiftp:// URL.
92     *
93     * @throws Exception
94     */
95    public void doRealQuery(String sql, String registryUrlString, 
96          String outputUrl) throws Exception 
97    {
98      int timeout = 300;  // TOFIX configurable?
99  
100     ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
101     String xmlString ="";
102 
103     // Check for null parameters
104     if (sql == null) {
105       String errMess = "Input sql query cannot be null";
106       logger.error(errMess);
107       throw new Exception(errMess);
108     }
109     if (registryUrlString == null) {
110       String errMess = "Input registry URL cannot be null";
111       logger.error(errMess);
112       throw new Exception(errMess);
113     }
114     if (outputUrl == null) {
115       String errMess = "Output results URL cannot be null";
116       logger.error(errMess);
117       throw new Exception(errMess);
118     }
119 
120     // Parse the delivery URL.  GlobusURL is used insted of java.net.URL
121     // in order to include gsiftp URLs.
122     GlobusURL url = new GlobusURL(outputUrl);
123 
124     // Do we need a secure service?
125     boolean needSecure = false;
126     if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
127           url.getProtocol().equalsIgnoreCase("gridftp")) {
128       needSecure = true;  
129     }
130 
131     // Do a synchronous query using the GDS.
132     try {
133 
134       // Get the registry
135       ServiceGroupRegistry registry = 
136             ServiceFetcher.getRegistry(registryUrlString);
137       if (registry == null) {
138         String errMess = 
139             "Couldn't get ServiceGroupRegistry from URL "
140             + registryUrlString;
141         logger.error(errMess);
142         throw new Exception(errMess);
143       }
144 
145       // Get the factory from the registry
146       String factoryHandle = getFactoryHandle(registry, needSecure);
147       if (factoryHandle == null) {
148         String errMess = 
149             "Couldn't get handle for GDS factory from registry at "
150             + registryUrlString;
151         logger.error(errMess);
152         throw new Exception(errMess);
153       }
154 
155       // Locate the Factory
156       GridDataServiceFactory factory = 
157               ServiceFetcher.getFactory(factoryHandle);      
158       logger.info("Found GDSF at " + factoryHandle); 
159 
160       // Create a GridDataService
161       GridDataService gds = factory.createGridDataService();
162       logger.info("Created GDS");
163 
164       // Set initial termination time min 2Hrs, max 3Hrs after now
165       refreshTermination(gds, 2);
166 
167       // Construct an Activity request
168       // Set the query
169       SQLQuery query = new SQLQuery(sql);
170       ActivityRequest request = new ActivityRequest();
171       request.addActivity(query);
172       logger.info("Sending SQL Query \"" + sql + "\" to GDS");                                                                                
173       // Set up the XSLT transform at the server end
174       // First, delivery of the stylesheet
175       // KLUDGE - hardwired transform address!!
176       DeliverFromURL xsltDelivery = new DeliverFromURL(
177             "http://astrogrid.ast.cam.ac.uk/xslt/ag-warehouse-first.xsl");
178       request.addActivity(xsltDelivery);
179 
180       // Second, the actual transformation activity
181       ActivityOutput queryOutput = query.getOutput();
182       ActivityOutput xslDelOutput = xsltDelivery.getOutput();
183       XSLTransform xslTransform = new XSLTransform(queryOutput,xslDelOutput);
184       request.addActivity(xslTransform);
185       
186       // Now set the delivery URL
187       // This could be a file:// or a gsiftp:// url.
188   
189       ActivityOutput transformOutput = xslTransform.getOutput();
190 
191       if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
192           url.getProtocol().equalsIgnoreCase("gridftp")) {
193          // Got a GridFTP URL
194         DeliverToGFTP delivery = new DeliverToGFTP(
195               url.getHost(), url.getPort(), url.getPath());
196         delivery.setInput(transformOutput);
197         request.addActivity(delivery);
198         logger.info("Adding delivery address 'gsiftp://"
199             +  url.getHost() + ":" 
200             + Integer.toString(url.getPort()) + url.getPath());   
201 
202       }
203       else if (url.getProtocol().equalsIgnoreCase("file")) {
204         // Got a local file URL
205         DeliverToURL delivery = new DeliverToURL(outputUrl);
206         delivery.setInput(transformOutput);
207         request.addActivity(delivery);
208       }
209       else {
210         String errMess =
211            "Unknown or unsupported protocol for results delivery: " +
212             url.getProtocol() +
213            "Expected 'file://'' or 'gsiftp://' or 'gridftp://'";
214         logger.error(errMess);
215         throw new Exception(errMess);
216       }
217 
218       if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
219           url.getProtocol().equalsIgnoreCase("gridftp")) {
220         // Set up the authorization if using GridFTP 
221         MessageLevelSecurityProperty security = 
222               new MessageLevelSecurityProperty();
223         CredentialHolder credentialHolder = null;
224         // KLUDGE- hardwired locations
225         try {
226           credentialHolder = new CredentialHolder(
227               "/etc/grid-security/hostcert.pem",
228               "/etc/grid-security/hostkey.pem");
229         }
230         catch (Exception e) {
231           throw new Exception("No identity credentials could be obtained "
232                 + e.getMessage()
233                 +").");
234         }
235         security.setCredential(credentialHolder.getCredential());
236         gds.configure(security);
237       }
238 
239       // Perform the request
240       logger.info("Performing request.");
241       Response response = null;
242       try {
243         response = gds.perform(request);
244       }
245       catch (Exception e) {
246         //TOFIX MSG
247         logger.error("Perform crashed: " + e.getMessage());
248       }
249 
250       // Poll for completion (horrible hack, should use notifications
251       // really, but they'll be superseded in the new Grid specs :-(
252       int multFac = 1;
253       logger.debug("Waiting for results delivery to finish...");
254       while (true) {
255         ExtensibilityType result2 = gds.findServiceData(
256            QueryHelper.getNamesQuery(OGSADAIConstants.GDS_SDE_REQUEST_STATUS));
257         String status = AnyHelper.getAsString(result2);
258         if ((status.indexOf(OGSADAI_STATUS_COMPLETE) != -1) ||
259             (status.indexOf("COMPLETED") != -1)) {  //Completed
260           break;  // Out of while loop
261         }
262         else if (status.indexOf("ERROR") != -1) { //Error 
263           String errMess = "Got error status : " + status +
264             ", Response was " + response.getAsString();
265           logger.error(errMess);
266           throw new Exception(errMess);
267         }
268         else if ((status.indexOf("PROCESSING") != -1) ||
269                  (status.indexOf(OGSADAI_STATUS_INCOMPLETE) != -1)) { 
270           //Still going
271           //Min termination 2 hours from now
272           refreshTermination(gds,2);  
273 
274           //Wait for a while
275           Thread.currentThread().sleep(1000*multFac);
276 
277           if (multFac < 300) { //Arbitrary max wait of 5 minutes 
278             //multFac = multFac + 1;  //Wait longer next time
279           }
280         }
281         else {
282           //Unknown status, 
283           String errMess = "Didn't understand status: " + status +
284           ", response is " + response.getAsString();
285           logger.error(errMess);
286           throw new Exception(errMess);
287         }
288       }
289       //Got here, so request has completed successfully and can return
290       logger.debug("Results delivery has finished.");
291 
292     }
293     catch (AxisFault e) {
294       String errorMessage = "Problem with Axis: " + e.getMessage();
295       logger.error(errorMessage);
296       throw new Exception(errorMessage);
297     }
298     catch (Exception e) {
299       String errorMessage = "Unspecified exception: " + e.getMessage();
300       logger.error(errorMessage);
301       throw new Exception(errorMessage);
302     }
303   }
304 
305   /***
306    * Utility function to set or refresh the termination time of a
307    * Grid Data Service.  Sets the termination time to be after 
308    * offsetHours hours, and before (offsetHours+1) hours.
309    *
310    * @param gds GridDataService whose termination time is to be adjusted
311    *
312    * @param offsetHours Minimum hours from now for termination 
313    *
314    * @throws Exception
315    */
316   protected void refreshTermination( GridDataService gds, int offsetHours) 
317       throws Exception
318   {
319     try {
320       // Set initial termination time min 2Hrs, max 3Hrs after now
321       Calendar term = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
322       term.add(Calendar.HOUR, offsetHours);
323       gds.requestTerminationAfter(new ExtendedDateTimeType(term));
324       term.add(Calendar.HOUR, 1);
325       gds.requestTerminationBefore(new ExtendedDateTimeType(term));
326     }
327     catch (Exception e) {
328       throw new Exception(e.getMessage());
329     }
330   }
331 
332   /***
333    * Utility function to get a factory handle string from a DAI registry.
334    * If needSecure is true, looks for a secure service factory handle.  
335    * If not, looks for an insecure (normal) service factory handle first,
336    * and returns a secure one if an insecure one can't be found.
337    *
338    * @param registry The ServiceGroupRegistry to search for a factory handle
339    *
340    * @param needsSecure If true, look for a secure service
341    *
342    * @return String holding factory handle string
343    *
344    * @throws Exception
345    */
346   protected String getFactoryHandle(
347       ServiceGroupRegistry registry, boolean needSecure) throws Exception
348   {
349     // Get the factory from the registry
350     // If we want to use GridFTP delivery, look for a secure factory.
351     // Otherwise, look for an ordinary factory
352     //
353     GridServiceMetaData gsmd[] = 
354         registry.listServices(OGSADAIConstants.GDSF_PORT_TYPE);
355 
356     if (needSecure) {
357       for (int i = 0; i < gsmd.length; i++) {
358         String handle = gsmd[i].getHandle();
359         if (handle.toUpperCase().indexOf("SECURE") != -1) { //Secure found
360           return handle;
361         }
362       }
363       // If got here, no secure handle
364       String errMess = 
365         "Couldn't find secure Grid Data Service Factory to use; " +
366         "please choose a different results delivery method";
367       logger.error(errMess);
368       throw new Exception(errMess);
369     }
370     else {
371       String secureHandle = null;
372       for (int i = 0; i < gsmd.length; i++) {
373         String handle = gsmd[i].getHandle();
374         if (handle.toUpperCase().indexOf("SECURE") != -1) { //Secure found
375           secureHandle = handle;
376         }
377         else {
378           // Non-secure found, return this
379           return handle;
380         }
381       }
382       if (secureHandle == null) {
383          String errMess = 
384            "Couldn't find any Grid Data Service Factory to use; " +
385            "please check you are using the correct DAI registry";
386          logger.error(errMess);
387          throw new Exception(errMess);
388        }
389        else {
390          // Use secure handle if no insecure available
391          logger.warn(
392            "Couldn't find standard Grid Data Service Factory to use; " +
393            "using Secure GDSF instead.");
394          return secureHandle;
395       }
396     }
397     //Shouldn't get here
398   }
399 
400   /***
401    * An entry point so that a GDS query can be run from the command line.
402    * Expects the following parameters in the following order:
403    * - the SQL query string to be run
404    * - the URL of the OGSA-DAI registry from which to find the GDS
405    * - the URL of the results destination (file:// or gsiftp://).l
406    *
407    * So, for example:
408    * java org.astrogrid.warehouse.ogsadai.GdsQueryDelegate <br>
409    *   "SELECT * from first * LIMIT 5000" <br>
410    *   http://localhost:8080/ogsa/services/ogsadai/DAIServiceGroupRegistry <br>
411    *   file:///tmp/TEMPFILE
412    */
413   public static void main(String args[]) throws Exception {
414 
415     GdsQueryDelegate gdsQueryDelegate = new GdsQueryDelegate();
416 
417     String sql;
418     String registryUrlString;
419     String outputFileUrl = null;
420 
421     try {
422       int len = args.length;
423 
424       // Get SQL query (first parameter)
425       if ((len == 0) || (args[0] == null)) {
426 		    String errorMessage = 
427           "No SQL (first parameter) supplied to GdsQueryDelegate";
428         logger.error(errorMessage);
429         throw new Exception(errorMessage);
430       }
431       sql = args[0];
432 
433       // Get URL for ogsa-dai registry (second parameter)
434       if (len < 2) {
435 		    String errorMessage = 
436           "No Registry URL (second parameter) supplied to GdsQueryDelegate";
437         logger.error(errorMessage);
438         throw new Exception(errorMessage);
439       }
440       registryUrlString = args[1];
441 
442       // Get results filename if supplied (third parameter)
443       if (len < 3) {
444 		    String errorMessage = 
445           "No results destination URL (third parameter) supplied "
446           + "to GdsQueryDelegate";
447         logger.error(errorMessage);
448         throw new Exception(errorMessage);
449       }
450       outputFileUrl = args[2];
451 
452       if (len > 3) {
453 		    String errorMessage = 
454            "Too many parameters (" + Integer.toString(len) + 
455            ") supplied to GdsQueryDelegate";
456         logger.error(errorMessage);
457         throw new Exception(errorMessage);
458       }
459     }
460     catch (ArrayIndexOutOfBoundsException e) {
461 		  String errorMessage = 
462            "Unexpected number of parameters supplied to GdsQueryDelegate";
463         logger.error(errorMessage + ": " + e.getMessage());
464         throw new Exception(errorMessage);
465     }
466     //Do actual query 
467     gdsQueryDelegate.doRealQuery(sql, registryUrlString, outputFileUrl);
468   }
469 
470   // ----------------------------------------------------------
471   // Fallback defaults for values that should be configured on a
472   // per-installation basis in the WarehouseServiceImpl.properties 
473 
474   private final String DEFAULT_HOST_STRING = 
475         "http://astrogrid.ast.cam.ac.uk:4040";
476   private final String DEFAULT_REGISTRY_STRING = 
477         "/gdw/services/ogsadai/DAIServiceGroupRegistry";
478 
479   // Other utility strings
480   private final String TEMP_RESULTS_FILENAME = "ws_output.xml";
481   private final String WAREHOUSE_RESULT_START = "WAREHOUSE_RESULT_START";
482   private final String WAREHOUSE_RESULT_END = "WAREHOUSE_RESULT_END";
483 
484   private final String OGSADAI_STATUS_INCOMPLETE = 
485       "Request is being processed asynchronously";
486   private final String OGSADAI_STATUS_COMPLETE = 
487       "Available for processing";
488 //================================================================
489 
490 }
491 /*
492 $Log: GdsQueryDelegate.java,v $
493 Revision 1.17  2004/03/29 07:36:03  kea
494 Updating javadocs.
495 
496 Revision 1.16  2004/03/25 20:33:39  kea
497 Error reporting.
498 Change of default service location in test.
499 
500 Revision 1.15  2004/03/25 17:22:43  kea
501 Tidying javadocs, deprecating old classes, improved error handling etc.
502 
503 Revision 1.14  2004/03/24 12:35:18  kea
504 Proper lifetime management; selective use of secure services (only for
505 GridFTP delivery, not for file delivery).
506 
507 Revision 1.13  2004/03/16 14:03:36  kea
508 Temporary hack to unit tests to avoid build break - to fix later.
509 
510 Revision 1.12  2004/03/15 12:31:32  kea
511 Changed QueryDelegate to use the new OGSA-DAI toolkit, rather than our
512 own Grid delegates.
513 Added support for delivering to either GridFTP url or File url.
514 
515 Revision 1.11  2004/03/04 18:23:09  kea
516 Made member public to help test datacenter direct access.
517 
518 Revision 1.10  2004/03/04 15:33:47  kea
519 Start of integration with ogsa-dai 3.1, including using new ogsa-dai
520 client toolkit rather than our GdsDelegate.  (These changes initially
521 made on branch GDW_KEA_144, which has been abandoned cos I'm too
522 chicken to merge it).
523 
524 Revision 1.8.6.1  2004/03/04 15:20:17  kea
525 Changed to use new OGSA-DAI client interface toolkit.
526 Our GdsDelegate is no longer used.
527 
528 Revision 1.8  2003/12/15 15:36:09  kea
529 Tidying up GdsQueryDelegate to remove some old cruft.
530 
531 Revision 1.7  2003/12/15 14:39:20  kea
532 Fixed parameter count checks.
533 
534 Revision 1.6  2003/12/15 14:23:09  kea
535 Restoring changes buggered up by CVS, bastard bastard bastard.
536 No properties file for GdsQueryDelegate anymore, registry URL for
537 ogsa-dai passed in as parameter.
538 
539 Revision 1.4  2003/12/11 16:17:54  eca
540 Logging comments for System.out.println and in conjunction with 
541 thrown exceptions now in GdsDelegate, GdsQueryDelegate, and 
542 GridServiceDelegate 
543 
544 Added setFactoryHandleFromRegistry method; factory handle 
545 result returned as ExtensibilityType.
546 
547 Elizabeth Auden, 11 Dec 2003
548 
549 Revision 1.1  2003/12/02 13:56:51  kea
550 Delegate for performing OGSA-DAI queries, in separate package space from
551 Datacenter Querier module to isolate incompatible OGSA-DAI axis from
552 datacenter.
553 
554 */