1
2
3
4
5
6
7 package org.astrogrid.dataservice.queriers;
8
9 import org.astrogrid.dataservice.queriers.status.*;
10
11 import java.io.IOException;
12 import java.security.Principal;
13 import java.util.Date;
14 import java.util.Vector;
15 import org.apache.commons.logging.Log;
16 import org.astrogrid.dataservice.DatacenterException;
17 import org.astrogrid.query.Query;
18 import org.astrogrid.query.QueryException;
19 import org.astrogrid.slinger.Slinger;
20
21 /***
22 * Represents a single running query.
23 *
24 * <p>
25 * If two queries will be made on a database, two Querier instances
26 * will be required.
27 * <p>
28 * It is a very abstract class meant to help administer
29 * the queries - concrete subclasses take care of the details of performing the query.
30 * <p>
31 * @see package documentation
32 * <p>
33 * This class provides factory methods for both blocking and non-blocking
34 * queries, hopefully completely hiding the two-way translation process
35 *
36 * @see doQueryGetVotable()
37 * @see spawnQuery
38 * <p>
39 * @author M Hill
40 */
41
42 public class Querier implements Runnable, PluginListener {
43
44
45 protected static final Log log = org.apache.commons.logging.LogFactory.getLog(Querier.class);
46
47 /*** query to perform */
48 private final Query query;
49
50 /*** A handle is used to identify a particular query. It is also used as the
51 * basis for any temporary storage. */
52 private final String id;
53
54 /*** On whose behalf is this querier running */
55 private final Principal user;
56
57 /*** Plugin to run */
58 private QuerierPlugin plugin;
59
60 /*** List of listeners who will be updated when the status changes */
61 private Vector listeners = new Vector();
62
63 /*** status of query */
64 private QuerierStatus status;
65
66 /*** true if abort called */
67 private boolean aborted = false;
68
69 /*** Represents what created this querier */
70 private Object source = null;
71
72 /*** Represents size of results */
73 private long resultsSize = -1;
74
75 /*** For measuring how long the query took - calculated from status change times*/
76
77 /*** For measuring how long query took - calculated from status change times*/
78
79 /*** For measuring how long query took - calculated from status change times*/
80
81
82 /*** temporary used for generating unique handles - see generateHandle() */
83 private static java.util.Random random = new java.util.Random();
84
85 /*** Logging status information */
86 StatusLogger statusLog = new StatusLogger();
87
88
89 /*** This is private so that if the mechanism changes and we make the plugins
90 subclasses of queriers, for example, then we don't have to change the rest of the
91 * code. The query includes details about the results, and the 'aSource' is just
92 * used to indicate where the querier came from - eg a test class, or JSPs, or
93 * CEA, etc */
94 private Querier(Principal forUser, Query query, Object aSource) throws IOException {
95 this.id = generateQueryId();
96 this.user = forUser;
97 this.query = query;
98 this.source = aSource;
99
100
101 assertQueryLegal(query);
102
103
104 plugin = QuerierPluginFactory.createPlugin(this);
105
106 setStatus(new QuerierConstructed(this));
107 if (source != null) {
108 status.addDetail("Source: "+source.toString());
109 }
110 status.addDetail("Query: "+query);
111 status.addDetail("User: "+forUser.getName());
112
113
114
115
116 if (query.getResultsDef() != null)
117 {
118 Slinger.testConnection(query.getTarget());
119 }
120
121
122 }
123
124 /*** Backwards compatible to take the old account. @deprecated - use the makeQuerier(Principal etc) *
125 public static Querier makeQuerier(Account forUser, Query query, Object source) throws IOException {
126 return makeQuerier(new LoginAccount(forUser.getIndividual()+"@"+forUser.getCommunity(),""), query, source);
127 }
128 */
129
130 /*** Factory method. Query includes the results definition, and 'source' represents
131 * what clas was used to create this querier (optional) */
132 public static Querier makeQuerier(Principal forUser, Query query, Object source) throws IOException {
133
134 Querier querier = new Querier(forUser, query, source);
135
136 return querier;
137 }
138
139
140 /*** Returns this instances handle */
141 public String getId() { return id; }
142
143 /*** Returns the query for subclasses */
144 public Query getQuery() { return query; }
145
146 /*** Returns the Principal the querier is being run for */
147 public Principal getUser() { return user; }
148
149 /*** Returns the plugin - this is *only* for testing @todo a nicer way of doing this */
150 public QuerierPlugin getPlugin() { return plugin; }
151
152 /*** Returns the class involved in creating this querier instance. Used for
153 * analysis etc */
154 public Object getSource() { return source; }
155
156 /***
157 * Runnable implementation - this method is called when the thread to run
158 * this asynchronously is started. It should just do ask() and handle
159 * any error, because these won't go anywhere otherwise...
160 */
161 public void run() {
162 log.info("Starting Query ["+id+"] asynchronously...");
163
164 try {
165 ask();
166 }
167 catch (Throwable th) {
168 log.error("Exception during Asynchronous Run",th);
169 if (!(getStatus() instanceof QuerierError)) {
170 setStatus(new QuerierError(getStatus(), "", th));
171 }
172 }
173 log.info("...Ending asynchronous Query ["+id+"]");
174
175 }
176
177 /*** Carries out the query via the plugin to do the query.
178 * The plugin does the complete conversion, query and
179 * results processing
180 */
181 public void ask() throws IOException {
182
183
184 if ((query.getResultsDef() == null) || (query.getTarget() == null)) {
185 throw new DatacenterException("No target given for the results, query "+getId()+" from "+status.getSource()+" by "+status.getOwner());
186 }
187
188
189 if (!(getStatus() instanceof QuerierAborted)) {
190 try {
191 plugin.askQuery(user, query, this);
192 }
193 catch (QueryException e) {
194 log.error(
195 "Query execution failed for query "+getId()+
196 " from "+status.getSource()+" by "+status.getOwner() +
197 " : " + e.getMessage());
198 throw new DatacenterException(
199 "Query execution failed for query "+getId()+
200 " from "+status.getSource()+" by "+status.getOwner() +
201 " : " + e.getMessage(), e);
202 }
203 close();
204 }
205 }
206
207 /*** Asks the plugin for the count (ie number of matches) for
208 * the given query. These are asynchronous (blocking)
209 */
210 public long askCount() throws IOException {
211 try {
212 return plugin.getCount(user, query, this);
213 }
214 catch (QueryException e) {
215 log.error(
216 "askCount execution failed for query "+getId()+
217 " from "+status.getSource()+" by "+status.getOwner() +
218 " : " + e.getMessage());
219 throw new DatacenterException(
220 "askCount execution failed for query "+getId()+
221 " from "+status.getSource()+" by "+status.getOwner() +
222 " : " + e.getMessage(), e);
223 }
224 }
225
226 /*** Sets the number of results */
227 public void setResultsSize(long size) {
228 resultsSize = size;
229 }
230
231
232 /***
233 * Closes & tidies up
234 */
235 public void close() {
236 if (!getStatus().isFinished()) {
237 setStatus(new QuerierComplete(getStatus()));
238 }
239 getStatus().setProgressMax(resultsSize);
240 plugin = null;
241 }
242
243 /***
244 * Returns the time it took to complete the query in milliseconds, or the
245 * time since it started (if it's still running). -1 if the query has not
246 * yet started
247 */
248 public long getQueryTimeTaken() {
249
250
251 QuerierStatus status = getStatus();
252 QuerierStatus next = null;
253 while ((status != null) && !(status instanceof QuerierQuerying)) {
254 next = status;
255 status = (QuerierStatus) status.getPrevious();
256 }
257 if (status == null) {
258
259 return -1;
260 }
261 Date queryStarted = status.getTimestamp();
262 if (next != null) {
263
264 return next.getTimestamp().getTime() - queryStarted.getTime();
265 }
266 else {
267 return new Date().getTime() - queryStarted.getTime();
268 }
269 }
270
271 /***
272 * Abort - stops query (if poss) and tidies up
273 */
274 public QuerierStatus abort() {
275
276
277 if (plugin != null) {
278 plugin.abort();
279 setStatus(new QuerierAborted(getStatus()));
280 }
281
282 aborted = true;
283
284 return getStatus();
285
286 }
287
288 /*** For owned classes to test */
289 public boolean isAborted() {
290 return aborted;
291 }
292
293
294 /***
295 * Checks to see if the query is legal on this database - ie the tables
296 * specified are allowed to be queried, etc. Throws an exception if not.
297 */
298 public void assertQueryLegal(Query query) {
299
300 }
301
302 /***
303 * For debugging/display
304 */
305 public String toString() {
306 return "Querier ["+getId()+"] ";
307 }
308
309 /***
310 * Returns if the querier is closed and no more operations are possible
311 */
312 public boolean isClosed() {
313 return (status.isFinished());
314 }
315
316 /*** Called by the plugin to register a status change
317 */
318 public void pluginStatusChanged(QuerierStatus newStatus) {
319 setStatus(newStatus);
320 }
321
322
323
324 /***
325 * Sets the status. NB if the new status is ordered before the existing one,
326 * throws an exception (as each querier should only handle one query).
327 * Synchronised as the queriers may be running under a different thread
328 */
329 public synchronized void setStatus(QuerierStatus newStatus) {
330
331 if (status != null) {
332 if ((status instanceof QuerierError) || (newStatus.isBefore(status))) {
333 String msg = "Trying to start a step '"+newStatus+"' when status is already "+status;
334 log.error(msg);
335 throw new IllegalStateException(msg);
336 }
337 }
338
339 log.info("Query ["+id+"] for "+user+", now "+newStatus);
340
341 status = newStatus;
342
343 fireStatusChanged(status);
344
345 if (status.isFinished()) {
346 statusLog.log(status);
347 }
348 }
349
350 /***
351 * Returns the status - contains more info than the state
352 */
353 public QuerierStatus getStatus() {
354 return status;
355 }
356
357 /***
358 * Register a status listener. This will be informed of changes in status
359 * to the service - IF the service supports such info. Otherwise it will
360 * just get 'starting', 'working' and 'completed' messages based around the
361 * basic http exchange.
362 */
363 public void addListener(QuerierListener aListener) {
364 listeners.add(aListener);
365 }
366
367 /***
368 * Removes a listern
369 */
370 public void removeListener(QuerierListener aListener) {
371 listeners.remove(aListener);
372 }
373 /*** informs all listeners of the new status change. Not threadsafe... should
374 * call setStatus() rather than this directly
375 */
376 private void fireStatusChanged(QuerierStatus newStatus) {
377 for (int i=0;i<listeners.size();i++) {
378 try {
379 ((QuerierListener) listeners.get(i)).queryStatusChanged(this);
380 }
381 catch (RuntimeException e) {
382
383
384 log.error("Listener ("+listeners.get(i)+") failed to handle status change to "+newStatus, e);
385
386 }
387 }
388 }
389
390
391 /***
392 * Helper method to generates a handle for use by a particular instance; uses the current
393 * time to help us debug (ie we can look at the temporary directories and
394 * see which was the last run). Later we could add service/user information
395 * if available
396 * @todo not guaranteed to be unique...
397 */
398 protected static String generateQueryId() {
399 Date todayNow = new Date();
400 return
401 todayNow.getYear()
402 + "-"
403 + doubleDigits(todayNow.getMonth())
404 + "-"
405 + doubleDigits(todayNow.getDate())
406 + "_"
407 + doubleDigits(todayNow.getHours())
408 + "."
409 + doubleDigits(todayNow.getMinutes())
410 + "."
411 + doubleDigits(todayNow.getSeconds())
412 + "_"
413
414 + (random.nextInt(8999999) + 1000000);
415
416 }
417
418 /*** Returns number as two digits */
419 private static String doubleDigits(int i) {
420 if (i<10) {
421 return "0"+i;
422 }
423 else {
424 return ""+i;
425 }
426 }
427
428
429
430
431 }
432
433
434
435