1
2
3
4
5
6
7 package org.astrogrid.warehouse.ogsadai;
8
9 import org.astrogrid.warehouse.ogsadai.XSLTransform;
10
11 import org.w3c.dom.Document;
12 import org.w3c.dom.Element;
13 import org.w3c.dom.Node;
14 import org.w3c.dom.NodeList;
15 import org.w3c.dom.NamedNodeMap;
16 import org.apache.log4j.Logger;
17
18 import org.apache.xerces.parsers.DOMParser;
19 import org.xml.sax.InputSource;
20 import org.xml.sax.SAXException;
21
22 import java.io.IOException;
23 import java.io.ByteArrayOutputStream;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.io.FileOutputStream;
27 import java.io.StringReader;
28 import org.globus.ogsa.utils.AnyHelper;
29 import org.globus.ogsa.GridServiceException;
30 import org.gridforum.ogsi.ExtendedDateTimeType;
31 import org.gridforum.ogsi.TerminationTimeType;
32 import org.gridforum.ogsi.ExtensibilityType;
33 import org.apache.axis.AxisFault;
34
35 import uk.org.ogsadai.client.toolkit.ActivityRequest;
36 import uk.org.ogsadai.client.toolkit.ActivityOutput;
37 import uk.org.ogsadai.client.toolkit.GridDataService;
38 import uk.org.ogsadai.client.toolkit.GridDataServiceFactory;
39 import uk.org.ogsadai.client.toolkit.MessageLevelSecurityProperty;
40 import uk.org.ogsadai.client.toolkit.Response;
41 import uk.org.ogsadai.client.toolkit.ServiceFetcher;
42 import uk.org.ogsadai.client.toolkit.ServiceGroupRegistry;
43 import uk.org.ogsadai.client.toolkit.GridServiceMetaData;
44 import uk.org.ogsadai.client.toolkit.activity.sql.SQLQuery;
45 import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverToGFTP;
46 import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverToURL;
47 import uk.org.ogsadai.client.toolkit.activity.delivery.DeliverFromURL;
48 import uk.org.ogsadai.service.OGSADAIConstants;
49
50 import org.ietf.jgss.GSSCredential;
51 import org.globus.ogsa.utils.QueryHelper;
52 import org.globus.util.GlobusURL;
53
54 import java.util.Calendar;
55 import java.util.TimeZone;
56
57 /***
58 * A high-level delegate for invoking a query on an OGSA-DAI Grid Data Service.
59 *
60 * This class uses the OGDA-DAI client toolkit, introduced in OGSA-DAI 3.1,
61 * to interact with the OGSA-DAI GDS. The toolkit abstracts away most of
62 * the detail of the underlying service, which is good as the whole grid
63 * infrastructure is about to change out from under us.
64 *
65 * @author K Andrews
66 */
67 public class GdsQueryDelegate
68 {
69 static Logger logger = Logger.getLogger("GdsQueryLogger");
70
71 /***
72 * Default empty constructor.
73 *
74 * @throws IOException
75 * @throws SAXException
76 */
77 public GdsQueryDelegate() throws IOException, SAXException
78 {
79 super();
80 }
81
82 /***
83 * Uses an OGSA-DAI Grid Data Service to perform the supplied SQL query.
84 *
85 * @param sql String containing the SQL query to be performed
86 *
87 * @param registryUrlString String containing the URL of the DAI Registry
88 * to be used to find a GDS factory
89 *
90 * @param outputUrl String containing the URL to which results should be
91 * returned; can be a file:// or gsiftp:// URL.
92 *
93 * @throws Exception
94 */
95 public void doRealQuery(String sql, String registryUrlString,
96 String outputUrl) throws Exception
97 {
98 int timeout = 300;
99
100 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
101 String xmlString ="";
102
103
104 if (sql == null) {
105 String errMess = "Input sql query cannot be null";
106 logger.error(errMess);
107 throw new Exception(errMess);
108 }
109 if (registryUrlString == null) {
110 String errMess = "Input registry URL cannot be null";
111 logger.error(errMess);
112 throw new Exception(errMess);
113 }
114 if (outputUrl == null) {
115 String errMess = "Output results URL cannot be null";
116 logger.error(errMess);
117 throw new Exception(errMess);
118 }
119
120
121
122 GlobusURL url = new GlobusURL(outputUrl);
123
124
125 boolean needSecure = false;
126 if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
127 url.getProtocol().equalsIgnoreCase("gridftp")) {
128 needSecure = true;
129 }
130
131
132 try {
133
134
135 ServiceGroupRegistry registry =
136 ServiceFetcher.getRegistry(registryUrlString);
137 if (registry == null) {
138 String errMess =
139 "Couldn't get ServiceGroupRegistry from URL "
140 + registryUrlString;
141 logger.error(errMess);
142 throw new Exception(errMess);
143 }
144
145
146 String factoryHandle = getFactoryHandle(registry, needSecure);
147 if (factoryHandle == null) {
148 String errMess =
149 "Couldn't get handle for GDS factory from registry at "
150 + registryUrlString;
151 logger.error(errMess);
152 throw new Exception(errMess);
153 }
154
155
156 GridDataServiceFactory factory =
157 ServiceFetcher.getFactory(factoryHandle);
158 logger.info("Found GDSF at " + factoryHandle);
159
160
161 GridDataService gds = factory.createGridDataService();
162 logger.info("Created GDS");
163
164
165 refreshTermination(gds, 2);
166
167
168
169 SQLQuery query = new SQLQuery(sql);
170 ActivityRequest request = new ActivityRequest();
171 request.addActivity(query);
172 logger.info("Sending SQL Query \"" + sql + "\" to GDS");
173
174
175
176 DeliverFromURL xsltDelivery = new DeliverFromURL(
177 "http://astrogrid.ast.cam.ac.uk/xslt/ag-warehouse-first.xsl");
178 request.addActivity(xsltDelivery);
179
180
181 ActivityOutput queryOutput = query.getOutput();
182 ActivityOutput xslDelOutput = xsltDelivery.getOutput();
183 XSLTransform xslTransform = new XSLTransform(queryOutput,xslDelOutput);
184 request.addActivity(xslTransform);
185
186
187
188
189 ActivityOutput transformOutput = xslTransform.getOutput();
190
191 if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
192 url.getProtocol().equalsIgnoreCase("gridftp")) {
193
194 DeliverToGFTP delivery = new DeliverToGFTP(
195 url.getHost(), url.getPort(), url.getPath());
196 delivery.setInput(transformOutput);
197 request.addActivity(delivery);
198 logger.info("Adding delivery address 'gsiftp://"
199 + url.getHost() + ":"
200 + Integer.toString(url.getPort()) + url.getPath());
201
202 }
203 else if (url.getProtocol().equalsIgnoreCase("file")) {
204
205 DeliverToURL delivery = new DeliverToURL(outputUrl);
206 delivery.setInput(transformOutput);
207 request.addActivity(delivery);
208 }
209 else {
210 String errMess =
211 "Unknown or unsupported protocol for results delivery: " +
212 url.getProtocol() +
213 "Expected 'file://'' or 'gsiftp://' or 'gridftp://'";
214 logger.error(errMess);
215 throw new Exception(errMess);
216 }
217
218 if (url.getProtocol().equalsIgnoreCase("gsiftp") ||
219 url.getProtocol().equalsIgnoreCase("gridftp")) {
220
221 MessageLevelSecurityProperty security =
222 new MessageLevelSecurityProperty();
223 CredentialHolder credentialHolder = null;
224
225 try {
226 credentialHolder = new CredentialHolder(
227 "/etc/grid-security/hostcert.pem",
228 "/etc/grid-security/hostkey.pem");
229 }
230 catch (Exception e) {
231 throw new Exception("No identity credentials could be obtained "
232 + e.getMessage()
233 +").");
234 }
235 security.setCredential(credentialHolder.getCredential());
236 gds.configure(security);
237 }
238
239
240 logger.info("Performing request.");
241 Response response = null;
242 try {
243 response = gds.perform(request);
244 }
245 catch (Exception e) {
246
247 logger.error("Perform crashed: " + e.getMessage());
248 }
249
250
251
252 int multFac = 1;
253 logger.debug("Waiting for results delivery to finish...");
254 while (true) {
255 ExtensibilityType result2 = gds.findServiceData(
256 QueryHelper.getNamesQuery(OGSADAIConstants.GDS_SDE_REQUEST_STATUS));
257 String status = AnyHelper.getAsString(result2);
258 if ((status.indexOf(OGSADAI_STATUS_COMPLETE) != -1) ||
259 (status.indexOf("COMPLETED") != -1)) {
260 break;
261 }
262 else if (status.indexOf("ERROR") != -1) {
263 String errMess = "Got error status : " + status +
264 ", Response was " + response.getAsString();
265 logger.error(errMess);
266 throw new Exception(errMess);
267 }
268 else if ((status.indexOf("PROCESSING") != -1) ||
269 (status.indexOf(OGSADAI_STATUS_INCOMPLETE) != -1)) {
270
271
272 refreshTermination(gds,2);
273
274
275 Thread.currentThread().sleep(1000*multFac);
276
277 if (multFac < 300) {
278
279 }
280 }
281 else {
282
283 String errMess = "Didn't understand status: " + status +
284 ", response is " + response.getAsString();
285 logger.error(errMess);
286 throw new Exception(errMess);
287 }
288 }
289
290 logger.debug("Results delivery has finished.");
291
292 }
293 catch (AxisFault e) {
294 String errorMessage = "Problem with Axis: " + e.getMessage();
295 logger.error(errorMessage);
296 throw new Exception(errorMessage);
297 }
298 catch (Exception e) {
299 String errorMessage = "Unspecified exception: " + e.getMessage();
300 logger.error(errorMessage);
301 throw new Exception(errorMessage);
302 }
303 }
304
305 /***
306 * Utility function to set or refresh the termination time of a
307 * Grid Data Service. Sets the termination time to be after
308 * offsetHours hours, and before (offsetHours+1) hours.
309 *
310 * @param gds GridDataService whose termination time is to be adjusted
311 *
312 * @param offsetHours Minimum hours from now for termination
313 *
314 * @throws Exception
315 */
316 protected void refreshTermination( GridDataService gds, int offsetHours)
317 throws Exception
318 {
319 try {
320
321 Calendar term = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
322 term.add(Calendar.HOUR, offsetHours);
323 gds.requestTerminationAfter(new ExtendedDateTimeType(term));
324 term.add(Calendar.HOUR, 1);
325 gds.requestTerminationBefore(new ExtendedDateTimeType(term));
326 }
327 catch (Exception e) {
328 throw new Exception(e.getMessage());
329 }
330 }
331
332 /***
333 * Utility function to get a factory handle string from a DAI registry.
334 * If needSecure is true, looks for a secure service factory handle.
335 * If not, looks for an insecure (normal) service factory handle first,
336 * and returns a secure one if an insecure one can't be found.
337 *
338 * @param registry The ServiceGroupRegistry to search for a factory handle
339 *
340 * @param needsSecure If true, look for a secure service
341 *
342 * @return String holding factory handle string
343 *
344 * @throws Exception
345 */
346 protected String getFactoryHandle(
347 ServiceGroupRegistry registry, boolean needSecure) throws Exception
348 {
349
350
351
352
353 GridServiceMetaData gsmd[] =
354 registry.listServices(OGSADAIConstants.GDSF_PORT_TYPE);
355
356 if (needSecure) {
357 for (int i = 0; i < gsmd.length; i++) {
358 String handle = gsmd[i].getHandle();
359 if (handle.toUpperCase().indexOf("SECURE") != -1) {
360 return handle;
361 }
362 }
363
364 String errMess =
365 "Couldn't find secure Grid Data Service Factory to use; " +
366 "please choose a different results delivery method";
367 logger.error(errMess);
368 throw new Exception(errMess);
369 }
370 else {
371 String secureHandle = null;
372 for (int i = 0; i < gsmd.length; i++) {
373 String handle = gsmd[i].getHandle();
374 if (handle.toUpperCase().indexOf("SECURE") != -1) {
375 secureHandle = handle;
376 }
377 else {
378
379 return handle;
380 }
381 }
382 if (secureHandle == null) {
383 String errMess =
384 "Couldn't find any Grid Data Service Factory to use; " +
385 "please check you are using the correct DAI registry";
386 logger.error(errMess);
387 throw new Exception(errMess);
388 }
389 else {
390
391 logger.warn(
392 "Couldn't find standard Grid Data Service Factory to use; " +
393 "using Secure GDSF instead.");
394 return secureHandle;
395 }
396 }
397
398 }
399
400 /***
401 * An entry point so that a GDS query can be run from the command line.
402 * Expects the following parameters in the following order:
403 * - the SQL query string to be run
404 * - the URL of the OGSA-DAI registry from which to find the GDS
405 * - the URL of the results destination (file:// or gsiftp://).l
406 *
407 * So, for example:
408 * java org.astrogrid.warehouse.ogsadai.GdsQueryDelegate <br>
409 * "SELECT * from first * LIMIT 5000" <br>
410 * http://localhost:8080/ogsa/services/ogsadai/DAIServiceGroupRegistry <br>
411 * file:///tmp/TEMPFILE
412 */
413 public static void main(String args[]) throws Exception {
414
415 GdsQueryDelegate gdsQueryDelegate = new GdsQueryDelegate();
416
417 String sql;
418 String registryUrlString;
419 String outputFileUrl = null;
420
421 try {
422 int len = args.length;
423
424
425 if ((len == 0) || (args[0] == null)) {
426 String errorMessage =
427 "No SQL (first parameter) supplied to GdsQueryDelegate";
428 logger.error(errorMessage);
429 throw new Exception(errorMessage);
430 }
431 sql = args[0];
432
433
434 if (len < 2) {
435 String errorMessage =
436 "No Registry URL (second parameter) supplied to GdsQueryDelegate";
437 logger.error(errorMessage);
438 throw new Exception(errorMessage);
439 }
440 registryUrlString = args[1];
441
442
443 if (len < 3) {
444 String errorMessage =
445 "No results destination URL (third parameter) supplied "
446 + "to GdsQueryDelegate";
447 logger.error(errorMessage);
448 throw new Exception(errorMessage);
449 }
450 outputFileUrl = args[2];
451
452 if (len > 3) {
453 String errorMessage =
454 "Too many parameters (" + Integer.toString(len) +
455 ") supplied to GdsQueryDelegate";
456 logger.error(errorMessage);
457 throw new Exception(errorMessage);
458 }
459 }
460 catch (ArrayIndexOutOfBoundsException e) {
461 String errorMessage =
462 "Unexpected number of parameters supplied to GdsQueryDelegate";
463 logger.error(errorMessage + ": " + e.getMessage());
464 throw new Exception(errorMessage);
465 }
466
467 gdsQueryDelegate.doRealQuery(sql, registryUrlString, outputFileUrl);
468 }
469
470
471
472
473
474 private final String DEFAULT_HOST_STRING =
475 "http://astrogrid.ast.cam.ac.uk:4040";
476 private final String DEFAULT_REGISTRY_STRING =
477 "/gdw/services/ogsadai/DAIServiceGroupRegistry";
478
479
480 private final String TEMP_RESULTS_FILENAME = "ws_output.xml";
481 private final String WAREHOUSE_RESULT_START = "WAREHOUSE_RESULT_START";
482 private final String WAREHOUSE_RESULT_END = "WAREHOUSE_RESULT_END";
483
484 private final String OGSADAI_STATUS_INCOMPLETE =
485 "Request is being processed asynchronously";
486 private final String OGSADAI_STATUS_COMPLETE =
487 "Available for processing";
488
489
490 }
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554