1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.dataservice.queriers;
12
13 import java.io.IOException;
14 import java.util.Comparator;
15 import java.util.Hashtable;
16 import java.util.TreeSet;
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.astrogrid.cfg.ConfigFactory;
20 import org.astrogrid.dataservice.queriers.status.QuerierQueued;
21 import org.astrogrid.dataservice.queriers.status.QuerierStatus;
22
23 /*** Manages the construction and initialization of Queriers, and maintains a
24 * collection of current Queriers. It might run queues or something... later...
25 *
26 * <p>
27 * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
28 * @author mch
29 */
30 public class QuerierManager implements QuerierListener {
31
32
33 private static final Log log = LogFactory.getLog(QuerierManager.class);
34
35 /*** Identifier for this manager/querier container */
36 private String managerId;
37
38 /*** List of managers */
39 private static Hashtable managers = new Hashtable();
40
41 /*** Lookup table of initialised queriers. These are queriers that have been
42 * created but are yet 'complete', or have not yet been told to run. For
43 * example, the CEA architecture has an 'init' which creates a query, and a
44 * 'execute' which will kick it to the queue
45 */
46 private Hashtable heldQueriers = new Hashtable();
47
48 /*** lookup table of queued queriers. These are queriers that are waiting on
49 a 'free' spot on the running queriers - ie when the running queriers have hit
50 * the maximum limit and the */
51 private Hashtable queuedQueriers = new Hashtable();
52
53 /*** priority index of queued queriers */
54 private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
55
56 /*** lookup table of all the current queriers indexed by their handle*/
57 private Hashtable runningQueriers = new Hashtable();
58
59 /*** lookup table of old queriers */
60 private Hashtable closedQueriers = new Hashtable();
61
62 /*** Maximum number of simultaneous queriers allowed */
63 private int maxQueriers = 5;
64
65 /*** Special ID used to create a test querier for testing getStatus,. etc */
66 public final static String TEST_QUERIER_ID = "TestQuerier:";
67
68 /*** PriorityComparitor for the queue */
69 protected class QuerierStartTimeComparator implements Comparator {
70
71 /***
72 * Compares its two arguments for order. Returns a negative integer,
73 * zero, or a positive integer as the first argument is less than, equal
74 * to, or greater than the second.<p>
75 *
76 * @param o1 the first object to be compared.
77 * @param o2 the second object to be compared.
78 * @return a negative integer, zero, or a positive integer as the
79 * first argument is less than, equal to, or greater than the
80 * second.
81 * @throws ClassCastException if the arguments' types prevent them from
82 * being compared by this Comparator.
83 */
84 public int compare(Object o1, Object o2) {
85 Querier q1 = (Querier) o1;
86 Querier q2 = (Querier) o2;
87 if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
88 return -1;
89 }
90 else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
91 return 1;
92 }
93 else {
94 return 0;
95 }
96 }
97
98 }
99
100 /*** Status Comparitor for displays */
101 protected class StatusStartTimeComparator implements Comparator {
102
103 /***
104 * Compares its two arguments for order. Returns a negative integer,
105 * zero, or a positive integer as the first argument is less than, equal
106 * to, or greater than the second.<p>
107 *
108 * @param o1 the first object to be compared.
109 * @param o2 the second object to be compared.
110 * @return a negative integer, zero, or a positive integer as the
111 * first argument is less than, equal to, or greater than the
112 * second.
113 * @throws ClassCastException if the arguments' types prevent them from
114 * being compared by this Comparator.
115 */
116 public int compare(Object o1, Object o2) {
117 QuerierStatus q1 = (QuerierStatus) o1;
118 QuerierStatus q2 = (QuerierStatus) o2;
119 if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
120 return 1;
121 }
122 else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
123 return -1;
124 }
125 else {
126 return 0;
127 }
128 }
129
130 }
131
132 /*** Constructor. Protected because we want to force people to use the factory method */
133 protected QuerierManager(String givenId) {
134 this.managerId = givenId;
135
136 maxQueriers = ConfigFactory.getCommonConfig().getInt("datacenter.max.queries",4);
137
138 }
139
140 /*** Factory method - checks to see if the givenId already exists and returns that if so */
141 public synchronized static QuerierManager getManager(String givenId) {
142 if (managers.get(givenId) != null) {
143 return (QuerierManager) managers.get(givenId);
144 }
145 else {
146 QuerierManager manager = new QuerierManager(givenId);
147 managers.put(givenId, manager);
148 return manager;
149 }
150 }
151
152 /*** Return the querier with the given id */
153 public Querier getQuerier(String qid) {
154
155 Querier q = (Querier) runningQueriers.get(qid);
156 if (q != null) return q;
157
158 q = (Querier) queuedQueriers.get(qid);
159 if (q != null) return q;
160
161 q = (Querier) closedQueriers.get(qid);
162 return q;
163 }
164
165 /*** Returns a list of all the queued (initialised but not started) queriers - including
166 * those that are on the active queue and those just initialised waiting on some external push
167 */
168 public QuerierStatus[] getQueued() {
169 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
170 Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
171 QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
172 for (int i = 0; i < queued.length; i++) {
173 statuses[i] = queued[i].getStatus();
174 }
175 for (int j = 0; j < initialised.length; j++) {
176 statuses[j+queued.length] = initialised[j].getStatus();
177 }
178 return statuses;
179 }
180
181 /*** Returns a list of all the currently running queriers
182 */
183 public QuerierStatus[] getRunning() {
184 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
185 QuerierStatus[] statuses = new QuerierStatus[running.length];
186 for (int i = 0; i < running.length; i++) {
187 statuses[i] = running[i].getStatus();
188 }
189 return statuses;
190 }
191
192 /*** Returns a list of all the ids of the currently running queriers
193 */
194 public QuerierStatus[] getClosed() {
195 Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
196 QuerierStatus[] statuses = new QuerierStatus[closed.length];
197 for (int i = 0; i < closed.length; i++) {
198 statuses[i] = closed[i].getStatus();
199 }
200 return statuses;
201 }
202
203 /*** Returns the status's of all the queriers in date/time order */
204 public QuerierStatus[] getAllStatus() {
205 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
206 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
207 Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
208
209 TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
210 for (int i = 0; i < queued.length; i++) {
211 statuses.add(queued[i].getStatus());
212 }
213 for (int i = 0; i < running.length; i++) {
214 statuses.add(running[i].getStatus());
215 }
216 for (int i = 0; i < ran.length; i++) {
217 statuses.add(ran[i].getStatus());
218 }
219 return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
220 }
221
222 /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
223 public void holdQuerier(Querier querier) {
224 heldQueriers.put(querier.getId(), querier);
225 }
226
227 /***
228 * Adds the given querier to this manager, and starts it off on a new
229 * thread. asynchronous.
230 */
231 public void submitQuerier(Querier querier) {
232
233
234 if (heldQueriers.get(querier.getId()) != null) {
235 heldQueriers.remove(querier.getId());
236 }
237
238
239 if (runningQueriers.get(querier.getId()) != null) {
240 log.error( "Handle '" + querier.getId() + "' already in use");
241 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
242 }
243 querier.setStatus(new QuerierQueued(querier.getStatus()));
244 queuedQueriers.put(querier.getId(), querier);
245 queuedPriorities.add(querier);
246 querier.addListener(this);
247
248 checkQueue();
249 }
250
251 /***
252 * Adds the given querier to this manager, runs it, and returns the status;
253 * synchronous (blocking); not queued
254 */
255 public QuerierStatus askQuerier(Querier querier) throws IOException {
256
257
258 if (runningQueriers.get(querier.getId()) != null) {
259 log.error( "Handle '" + querier.getId() + "' already in use");
260 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
261 }
262 runningQueriers.put(querier.getId(), querier);
263 querier.addListener(this);
264 querier.ask();
265 return querier.getStatus();
266 }
267
268 /*** Adds the given querier to this manager, and asks the querier for the
269 * count (number of matches). Synchronous, returning the number of matches
270 */
271 public long askCount(Querier querier) throws IOException {
272
273
274 if (runningQueriers.get(querier.getId()) != null) {
275 log.error( "Handle '" + querier.getId() + "' already in use");
276 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
277 }
278 runningQueriers.put(querier.getId(), querier);
279 querier.addListener(this);
280 return querier.askCount();
281 }
282
283
284 /*** A Querier manager must listen to it's queriers
285 */
286 public void queryStatusChanged(Querier querier) {
287
288
289 if (querier.getStatus().isFinished()) {
290 runningQueriers.remove(querier.getId());
291 queuedQueriers.remove(querier.getId());
292 closedQueriers.put(querier.getId(), querier);
293 checkQueue();
294 }
295 }
296
297 /*** Checks the queue - if there are queued queriers and not too many
298 * running, moves a queued one and starts it
299 */
300 protected synchronized void checkQueue() {
301
302 System.gc();
303
304
305
306
307
308
309
310 while ((queuedQueriers.size()>0) &&
311 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
312
313 Querier first = (Querier) queuedPriorities.first();
314 queuedPriorities.remove(first);
315 queuedQueriers.remove(first.getId());
316 runningQueriers.put(first.getId(), first);
317
318 Thread qth = new Thread(first);
319 qth.start();
320 }
321 }
322
323
324 /*** Shut down - abort all running queries */
325 public void shutDown() {
326
327 queuedQueriers.clear();
328 queuedPriorities.clear();
329
330 QuerierStatus[] running = getRunning();
331 for (int i = 0; i < running.length; i++) {
332 Querier q = getQuerier(running[i].getId());
333 try {
334 q.abort();
335 }
336 catch (Throwable th) {
337
338 }
339 }
340 }
341
342
343
344
345 }
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532