1
2
3
4
5
6
7 package org.astrogrid.dataservice.queriers;
8
9 import org.astrogrid.dataservice.queriers.status.*;
10
11 import java.io.IOException;
12 import java.security.Principal;
13 import java.util.Date;
14 import java.util.Vector;
15 import org.apache.commons.logging.Log;
16 import org.astrogrid.dataservice.DatacenterException;
17 import org.astrogrid.query.Query;
18 import org.astrogrid.slinger.Slinger;
19 import org.astrogrid.slinger.StoreException;
20
21 /***
22 * Represents a single running query.
23 *
24 * <p>
25 * If two queries will be made on a database, two Querier instances
26 * will be required.
27 * <p>
28 * It is a very abstract class meant to help administer
29 * the queries - concrete subclasses take care of the details of performing the query.
30 * <p>
31 * @see package documentation
32 * <p>
33 * This class provides factory methods for both blocking and non-blocking
34 * queries, hopefully completely hiding the two-way translation process
35 *
36 * @see doQueryGetVotable()
37 * @see spawnQuery
38 * <p>
39 * @author M Hill
40 */
41
42 public class Querier implements Runnable, PluginListener {
43
44
45 protected static final Log log = org.apache.commons.logging.LogFactory.getLog(Querier.class);
46
47 /*** query to perform */
48 private final Query query;
49
50 /*** A handle is used to identify a particular query. It is also used as the
51 * basis for any temporary storage. */
52 private final String id;
53
54 /*** On whose behalf is this querier running */
55 private final Principal user;
56
57 /*** Plugin to run */
58 private QuerierPlugin plugin;
59
60 /*** List of listeners who will be updated when the status changes */
61 private Vector listeners = new Vector();
62
63 /*** status of query */
64 private QuerierStatus status;
65
66 /*** true if abort called */
67 private boolean aborted = false;
68
69 /*** Represents what created this querier */
70 private Object source = null;
71
72 /*** Represents size of results */
73 private long resultsSize = -1;
74
75 /*** For measuring how long the query took - calculated from status change times*/
76
77 /*** For measuring how long query took - calculated from status change times*/
78
79 /*** For measuring how long query took - calculated from status change times*/
80
81
82 /*** temporary used for generating unique handles - see generateHandle() */
83 private static java.util.Random random = new java.util.Random();
84
85 /*** Logging status information */
86 StatusLogger statusLog = new StatusLogger();
87
88
89 /*** This is private so that if the mechanism changes and we make the plugins
90 subclasses of queriers, for example, then we don't have to change the rest of the
91 * code. The query includes details about the results, and the 'aSource' is just
92 * used to indicate where the querier came from - eg a test class, or JSPs, or
93 * CEA, etc */
94 private Querier(Principal forUser, Query query, Object aSource) throws IOException {
95 this.id = generateQueryId();
96 this.user = forUser;
97 this.query = query;
98 this.source = aSource;
99
100
101 assertQueryLegal(query);
102
103
104 plugin = QuerierPluginFactory.createPlugin(this);
105
106 setStatus(new QuerierConstructed(this));
107 if (source != null) {
108 status.addDetail("Source: "+source.toString());
109 }
110 status.addDetail("Query: "+query);
111 status.addDetail("User: "+forUser.getName());
112
113
114
115
116 if (query.getResultsDef() != null)
117 {
118 try {
119 Slinger.testConnection(query.getTarget(), user);
120 }
121 catch (IOException ioe) {
122 throw new StoreException("Unreachable Target '"+query.getTarget()+"': "+ioe.getMessage(),ioe);
123 }
124 }
125
126
127 }
128
129 /*** Backwards compatible to take the old account. @deprecated - use the makeQuerier(Principal etc) *
130 public static Querier makeQuerier(Account forUser, Query query, Object source) throws IOException {
131 return makeQuerier(new LoginAccount(forUser.getIndividual()+"@"+forUser.getCommunity(),""), query, source);
132 }
133 */
134
135 /*** Factory method. Query includes the results definition, and 'source' represents
136 * what clas was used to create this querier (optional) */
137 public static Querier makeQuerier(Principal forUser, Query query, Object source) throws IOException {
138
139 Querier querier = new Querier(forUser, query, source);
140
141 return querier;
142 }
143
144
145 /*** Returns this instances handle */
146 public String getId() { return id; }
147
148 /*** Returns the query for subclasses */
149 public Query getQuery() { return query; }
150
151 /*** Returns the Principal the querier is being run for */
152 public Principal getUser() { return user; }
153
154 /*** Returns the plugin - this is *only* for testing @todo a nicer way of doing this */
155 public QuerierPlugin getPlugin() { return plugin; }
156
157 /*** Returns the class involved in creating this querier instance. Used for
158 * analysis etc */
159 public Object getSource() { return source; }
160
161 /***
162 * Runnable implementation - this method is called when the thread to run
163 * this asynchronously is started. It should just do ask() and handle
164 * any error, because these won't go anywhere otherwise...
165 */
166 public void run() {
167 log.info("Starting Query ["+id+"] asynchronously...");
168
169 try {
170 ask();
171 }
172 catch (Throwable th) {
173 log.error("Exception during Asynchronous Run",th);
174 if (!(getStatus() instanceof QuerierError)) {
175 setStatus(new QuerierError(getStatus(), "", th));
176 }
177 }
178 log.info("...Ending asynchronous Query ["+id+"]");
179
180 }
181
182 /*** Carries out the query via the plugin to do the query.
183 * The plugin does the complete conversion, query and
184 * results processing
185 */
186 public void ask() throws IOException {
187
188
189 if ((query.getResultsDef() == null) || (query.getTarget() == null)) {
190 throw new DatacenterException("No target given for the results, query "+getId()+" from "+status.getSource()+" by "+status.getOwner());
191 }
192
193
194 if (!(getStatus() instanceof QuerierAborted)) {
195 plugin.askQuery(user, query, this);
196
197 close();
198 }
199 }
200
201 /*** Asks the plugin for the count (ie number of matches) for
202 * the given query. These are asynchronous (blocking)
203 */
204 public long askCount() throws IOException {
205 return plugin.getCount(user, query, this);
206 }
207
208 /*** Sets the number of results */
209 public void setResultsSize(long size) {
210 resultsSize = size;
211 }
212
213
214 /***
215 * Closes & tidies up
216 */
217 public void close() {
218 if (!getStatus().isFinished()) {
219 setStatus(new QuerierComplete(getStatus()));
220 }
221 getStatus().setProgressMax(resultsSize);
222 plugin = null;
223 }
224
225 /***
226 * Returns the time it took to complete the query in milliseconds, or the
227 * time since it started (if it's still running). -1 if the query has not
228 * yet started
229 */
230 public long getQueryTimeTaken() {
231
232
233 QuerierStatus status = getStatus();
234 QuerierStatus next = null;
235 while ((status != null) && !(status instanceof QuerierQuerying)) {
236 next = status;
237 status = (QuerierStatus) status.getPrevious();
238 }
239 if (status == null) {
240
241 return -1;
242 }
243 Date queryStarted = status.getTimestamp();
244 if (next != null) {
245
246 return next.getTimestamp().getTime() - queryStarted.getTime();
247 }
248 else {
249 return new Date().getTime() - queryStarted.getTime();
250 }
251 }
252
253 /***
254 * Abort - stops query (if poss) and tidies up
255 */
256 public QuerierStatus abort() {
257
258
259 if (plugin != null) {
260 plugin.abort();
261 setStatus(new QuerierAborted(getStatus()));
262 }
263
264 aborted = true;
265
266 return getStatus();
267
268 }
269
270 /*** For owned classes to test */
271 public boolean isAborted() {
272 return aborted;
273 }
274
275
276 /***
277 * Checks to see if the query is legal on this database - ie the tables
278 * specified are allowed to be queried, etc. Throws an exception if not.
279 */
280 public void assertQueryLegal(Query query) {
281
282 }
283
284 /***
285 * For debugging/display
286 */
287 public String toString() {
288 return "Querier ["+getId()+"] ";
289 }
290
291 /***
292 * Returns if the querier is closed and no more operations are possible
293 */
294 public boolean isClosed() {
295 return (status.isFinished());
296 }
297
298 /*** Called by the plugin to register a status change
299 */
300 public void pluginStatusChanged(QuerierStatus newStatus) {
301 setStatus(newStatus);
302 }
303
304
305
306 /***
307 * Sets the status. NB if the new status is ordered before the existing one,
308 * throws an exception (as each querier should only handle one query).
309 * Synchronised as the queriers may be running under a different thread
310 */
311 public synchronized void setStatus(QuerierStatus newStatus) {
312
313 if (status != null) {
314 if ((status instanceof QuerierError) || (newStatus.isBefore(status))) {
315 String msg = "Trying to start a step '"+newStatus+"' when status is already "+status;
316 log.error(msg);
317 throw new IllegalStateException(msg);
318 }
319 }
320
321 log.info("Query ["+id+"] for "+user+", now "+newStatus);
322
323 status = newStatus;
324
325 fireStatusChanged(status);
326
327 if (status.isFinished()) {
328 statusLog.log(status);
329 }
330 }
331
332 /***
333 * Returns the status - contains more info than the state
334 */
335 public QuerierStatus getStatus() {
336 return status;
337 }
338
339 /***
340 * Register a status listener. This will be informed of changes in status
341 * to the service - IF the service supports such info. Otherwise it will
342 * just get 'starting', 'working' and 'completed' messages based around the
343 * basic http exchange.
344 */
345 public void addListener(QuerierListener aListener) {
346 listeners.add(aListener);
347 }
348
349 /***
350 * Removes a listern
351 */
352 public void removeListener(QuerierListener aListener) {
353 listeners.remove(aListener);
354 }
355 /*** informs all listeners of the new status change. Not threadsafe... should
356 * call setStatus() rather than this directly
357 */
358 private void fireStatusChanged(QuerierStatus newStatus) {
359 for (int i=0;i<listeners.size();i++) {
360 try {
361 ((QuerierListener) listeners.get(i)).queryStatusChanged(this);
362 }
363 catch (RuntimeException e) {
364
365
366 log.error("Listener ("+listeners.get(i)+") failed to handle status change to "+newStatus, e);
367
368 }
369 }
370 }
371
372
373 /***
374 * Helper method to generates a handle for use by a particular instance; uses the current
375 * time to help us debug (ie we can look at the temporary directories and
376 * see which was the last run). Later we could add service/user information
377 * if available
378 * @todo not guaranteed to be unique...
379 */
380 protected static String generateQueryId() {
381 Date todayNow = new Date();
382 return
383 todayNow.getYear()
384 + "-"
385 + doubleDigits(todayNow.getMonth())
386 + "-"
387 + doubleDigits(todayNow.getDate())
388 + "_"
389 + doubleDigits(todayNow.getHours())
390 + "."
391 + doubleDigits(todayNow.getMinutes())
392 + "."
393 + doubleDigits(todayNow.getSeconds())
394 + "_"
395
396 + (random.nextInt(8999999) + 1000000);
397
398 }
399
400 /*** Returns number as two digits */
401 private static String doubleDigits(int i) {
402 if (i<10) {
403 return "0"+i;
404 }
405 else {
406 return ""+i;
407 }
408 }
409
410
411
412
413 }
414
415
416
417