1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.dataservice.queriers;
12
13 import java.io.IOException;
14 import java.util.Comparator;
15 import java.util.Hashtable;
16 import java.util.TreeSet;
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.astrogrid.dataservice.queriers.status.QuerierQueued;
20 import org.astrogrid.dataservice.queriers.status.QuerierStatus;
21 import org.astrogrid.dataservice.queriers.status.StatusLogger;
22 import org.astrogrid.status.TaskStatus;
23
24 /*** Manages the construction and initialization of Queriers, and maintains a
25 * collection of current Queriers. It might run queues or something... later...
26 *
27 * <p>
28 * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
29 * @author mch
30 */
31 public class QuerierManager implements QuerierListener {
32
33
34 private static final Log log = LogFactory.getLog(QuerierManager.class);
35
36 /*** Identifier for this manager/querier container */
37 private String managerId;
38
39 /*** List of managers */
40 private static Hashtable managers = new Hashtable();
41
42 /*** Lookup table of initialised queriers. These are queriers that have been
43 * created but are yet 'complete', or have not yet been told to run. For
44 * example, the CEA architecture has an 'init' which creates a query, and a
45 * 'execute' which will kick it to the queue
46 */
47 private Hashtable heldQueriers = new Hashtable();
48
49 /*** lookup table of queued queriers. These are queriers that are waiting on
50 a 'free' spot on the running queriers - ie when the running queriers have hit
51 * the maximum limit and the */
52 private Hashtable queuedQueriers = new Hashtable();
53
54 /*** priority index of queued queriers */
55 private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
56
57 /*** lookup table of all the current queriers indexed by their handle*/
58 private Hashtable runningQueriers = new Hashtable();
59
60 /*** lookup table of old queriers */
61 private Hashtable closedQueriers = new Hashtable();
62
63 /*** Maximum number of simultaneous queriers allowed */
64 private int maxQueriers = 20;
65
66 /*** Special ID used to create a test querier for testing getStatus,. etc */
67 public final static String TEST_QUERIER_ID = "TestQuerier:";
68
69 /*** PriorityComparitor for the queue */
70 protected class QuerierStartTimeComparator implements Comparator {
71
72 /***
73 * Compares its two arguments for order. Returns a negative integer,
74 * zero, or a positive integer as the first argument is less than, equal
75 * to, or greater than the second.<p>
76 *
77 * @param o1 the first object to be compared.
78 * @param o2 the second object to be compared.
79 * @return a negative integer, zero, or a positive integer as the
80 * first argument is less than, equal to, or greater than the
81 * second.
82 * @throws ClassCastException if the arguments' types prevent them from
83 * being compared by this Comparator.
84 */
85 public int compare(Object o1, Object o2) {
86 Querier q1 = (Querier) o1;
87 Querier q2 = (Querier) o2;
88 if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
89 return -1;
90 }
91 else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
92 return 1;
93 }
94 else {
95 return 0;
96 }
97 }
98
99 }
100
101 /*** Status Comparitor for displays */
102 protected class StatusStartTimeComparator implements Comparator {
103
104 /***
105 * Compares its two arguments for order. Returns a negative integer,
106 * zero, or a positive integer as the first argument is less than, equal
107 * to, or greater than the second.<p>
108 *
109 * @param o1 the first object to be compared.
110 * @param o2 the second object to be compared.
111 * @return a negative integer, zero, or a positive integer as the
112 * first argument is less than, equal to, or greater than the
113 * second.
114 * @throws ClassCastException if the arguments' types prevent them from
115 * being compared by this Comparator.
116 */
117 public int compare(Object o1, Object o2) {
118 QuerierStatus q1 = (QuerierStatus) o1;
119 QuerierStatus q2 = (QuerierStatus) o2;
120 if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
121 return 1;
122 }
123 else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
124 return -1;
125 }
126 else {
127 return 0;
128 }
129 }
130
131 }
132
133 /*** Constructor. Protected because we want to force people to use the factory method */
134 protected QuerierManager(String givenId) {
135 this.managerId = givenId;
136 }
137
138 /*** Factory method - checks to see if the givenId already exists and returns that if so */
139 public synchronized static QuerierManager getManager(String givenId) {
140 if (managers.get(givenId) != null) {
141 return (QuerierManager) managers.get(givenId);
142 }
143 else {
144 QuerierManager manager = new QuerierManager(givenId);
145 managers.put(givenId, manager);
146 return manager;
147 }
148 }
149
150 /*** Return the querier with the given id */
151 public Querier getQuerier(String qid) {
152
153 Querier q = (Querier) runningQueriers.get(qid);
154 if (q != null) return q;
155
156 q = (Querier) queuedQueriers.get(qid);
157 if (q != null) return q;
158
159 q = (Querier) closedQueriers.get(qid);
160 return q;
161 }
162
163 /*** Returns a list of all the queued (initialised but not started) queriers - including
164 * those that are on the active queue and those just initialised waiting on some external push
165 */
166 public QuerierStatus[] getQueued() {
167 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
168 Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
169 QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
170 for (int i = 0; i < queued.length; i++) {
171 statuses[i] = queued[i].getStatus();
172 }
173 for (int j = 0; j < initialised.length; j++) {
174 statuses[j+queued.length] = initialised[j].getStatus();
175 }
176 return statuses;
177 }
178
179 /*** Returns a list of all the currently running queriers
180 */
181 public QuerierStatus[] getRunning() {
182 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
183 QuerierStatus[] statuses = new QuerierStatus[running.length];
184 for (int i = 0; i < running.length; i++) {
185 statuses[i] = running[i].getStatus();
186 }
187 return statuses;
188 }
189
190 /*** Returns a list of all the ids of the currently running queriers
191 */
192 public QuerierStatus[] getClosed() {
193 Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
194 QuerierStatus[] statuses = new QuerierStatus[closed.length];
195 for (int i = 0; i < closed.length; i++) {
196 statuses[i] = closed[i].getStatus();
197 }
198 return statuses;
199 }
200
201 /*** Returns the status's of all the queriers in date/time order */
202 public QuerierStatus[] getAllStatus() {
203 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
204 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
205 Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
206
207 TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
208 for (int i = 0; i < queued.length; i++) {
209 statuses.add(queued[i].getStatus());
210 }
211 for (int i = 0; i < running.length; i++) {
212 statuses.add(running[i].getStatus());
213 }
214 for (int i = 0; i < ran.length; i++) {
215 statuses.add(ran[i].getStatus());
216 }
217 return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
218 }
219
220 /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
221 public void holdQuerier(Querier querier) {
222 heldQueriers.put(querier.getId(), querier);
223 }
224
225 /***
226 * Adds the given querier to this manager, and starts it off on a new
227 * thread. asynchronous.
228 */
229 public void submitQuerier(Querier querier) {
230
231
232 if (heldQueriers.get(querier.getId()) != null) {
233 heldQueriers.remove(querier.getId());
234 }
235
236
237 if (runningQueriers.get(querier.getId()) != null) {
238 log.error( "Handle '" + querier.getId() + "' already in use");
239 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
240 }
241 querier.setStatus(new QuerierQueued(querier.getStatus()));
242 queuedQueriers.put(querier.getId(), querier);
243 queuedPriorities.add(querier);
244 querier.addListener(this);
245
246 checkQueue();
247 }
248
249 /***
250 * Adds the given querier to this manager, runs it, and returns the status;
251 * synchronous (blocking); not queued
252 */
253 public QuerierStatus askQuerier(Querier querier) throws IOException {
254
255
256 if (runningQueriers.get(querier.getId()) != null) {
257 log.error( "Handle '" + querier.getId() + "' already in use");
258 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
259 }
260 runningQueriers.put(querier.getId(), querier);
261 querier.addListener(this);
262 querier.ask();
263 return querier.getStatus();
264 }
265
266 /*** Adds the given querier to this manager, and asks the querier for the
267 * count (number of matches). Synchronous, returning the number of matches
268 */
269 public long askCount(Querier querier) throws IOException {
270
271
272 if (runningQueriers.get(querier.getId()) != null) {
273 log.error( "Handle '" + querier.getId() + "' already in use");
274 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
275 }
276 runningQueriers.put(querier.getId(), querier);
277 querier.addListener(this);
278 return querier.askCount();
279 }
280
281
282 /*** A Querier manager must listen to it's queriers
283 */
284 public void queryStatusChanged(Querier querier) {
285
286
287 if (querier.getStatus().isFinished()) {
288 runningQueriers.remove(querier.getId());
289 queuedQueriers.remove(querier.getId());
290 closedQueriers.put(querier.getId(), querier);
291 checkQueue();
292 }
293 }
294
295 /*** Checks the queue - if there are queued queriers and not too many
296 * running, moves a queued one and starts it
297 */
298 protected synchronized void checkQueue() {
299
300 System.gc();
301
302
303
304
305
306
307
308 while ((queuedQueriers.size()>0) &&
309 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
310
311 Querier first = (Querier) queuedPriorities.first();
312 queuedPriorities.remove(first);
313 queuedQueriers.remove(first.getId());
314 runningQueriers.put(first.getId(), first);
315
316 Thread qth = new Thread(first);
317 qth.start();
318 }
319 }
320
321
322 /*** Shut down - abort all running queries */
323 public void shutDown() {
324
325 queuedQueriers.clear();
326 queuedPriorities.clear();
327
328 QuerierStatus[] running = getRunning();
329 for (int i = 0; i < running.length; i++) {
330 Querier q = getQuerier(running[i].getId());
331 try {
332 q.abort();
333 }
334 catch (Throwable th) {
335
336 }
337 }
338 }
339
340
341
342
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524