1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.datacenter.queriers;
12
13 import java.io.IOException;
14 import java.util.Comparator;
15 import java.util.Hashtable;
16 import java.util.TreeSet;
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.astrogrid.datacenter.queriers.status.QuerierClosed;
20 import org.astrogrid.datacenter.queriers.status.QuerierQueued;
21 import org.astrogrid.datacenter.queriers.status.QuerierStatus;
22
23 /*** Manages the construction and initialization of Queriers, and maintains a
24 * collection of current Queriers. It might run queues or something... later...
25 *
26 * <p>
27 * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
28 * @author mch
29 */
30 public class QuerierManager implements QuerierListener {
31
32
33 private static final Log log = LogFactory.getLog(QuerierManager.class);
34
35 /*** Identifier for this manager/querier container */
36 private String managerId;
37
38 /*** List of managers */
39 private static Hashtable managers = new Hashtable();
40
41 /*** Lookup table of initialised queriers. These are queriers that have been
42 * created but are yet 'complete', or have not yet been told to run. For
43 * example, the CEA architecture has an 'init' which creates a query, and a
44 * 'execute' which will kick it to the queue
45 */
46 private Hashtable heldQueriers = new Hashtable();
47
48 /*** lookup table of queued queriers. These are queriers that are waiting on
49 a 'free' spot on the running queriers - ie when the running queriers have hit
50 * the maximum limit and the */
51 private Hashtable queuedQueriers = new Hashtable();
52
53 /*** priority index of queued queriers */
54 private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
55
56 /*** lookup table of all the current queriers indexed by their handle*/
57 private Hashtable runningQueriers = new Hashtable();
58
59 /*** lookup table of old queriers */
60 private Hashtable closedQueriers = new Hashtable();
61
62 /*** Maximum number of simultaneous queriers allowed */
63 private int maxQueriers = 20;
64
65 /*** Special ID used to create a test querier for testing getStatus,. etc */
66 public final static String TEST_QUERIER_ID = "TestQuerier:";
67
68 /*** PriorityComparitor for the queue */
69 protected class QuerierStartTimeComparator implements Comparator {
70
71 /***
72 * Compares its two arguments for order. Returns a negative integer,
73 * zero, or a positive integer as the first argument is less than, equal
74 * to, or greater than the second.<p>
75 *
76 * @param o1 the first object to be compared.
77 * @param o2 the second object to be compared.
78 * @return a negative integer, zero, or a positive integer as the
79 * first argument is less than, equal to, or greater than the
80 * second.
81 * @throws ClassCastException if the arguments' types prevent them from
82 * being compared by this Comparator.
83 */
84 public int compare(Object o1, Object o2) {
85 Querier q1 = (Querier) o1;
86 Querier q2 = (Querier) o2;
87 if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
88 return -1;
89 }
90 else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
91 return 1;
92 }
93 else {
94 return 0;
95 }
96 }
97
98 }
99
100 /*** Status Comparitor for displays */
101 protected class StatusStartTimeComparator implements Comparator {
102
103 /***
104 * Compares its two arguments for order. Returns a negative integer,
105 * zero, or a positive integer as the first argument is less than, equal
106 * to, or greater than the second.<p>
107 *
108 * @param o1 the first object to be compared.
109 * @param o2 the second object to be compared.
110 * @return a negative integer, zero, or a positive integer as the
111 * first argument is less than, equal to, or greater than the
112 * second.
113 * @throws ClassCastException if the arguments' types prevent them from
114 * being compared by this Comparator.
115 */
116 public int compare(Object o1, Object o2) {
117 QuerierStatus q1 = (QuerierStatus) o1;
118 QuerierStatus q2 = (QuerierStatus) o2;
119 if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
120 return 1;
121 }
122 else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
123 return -1;
124 }
125 else {
126 return 0;
127 }
128 }
129
130 }
131
132 /*** Constructor. Protected because we want to force people to use the factory method */
133 protected QuerierManager(String givenId) {
134 this.managerId = givenId;
135 }
136
137 /*** Factory method - checks to see if the givenId already exists and returns that if so */
138 public synchronized static QuerierManager getManager(String givenId) {
139 if (managers.get(givenId) != null) {
140 return (QuerierManager) managers.get(givenId);
141 }
142 else {
143 QuerierManager manager = new QuerierManager(givenId);
144 managers.put(givenId, manager);
145 return manager;
146 }
147 }
148
149 /*** Return the querier with the given id */
150 public Querier getQuerier(String qid) {
151
152 Querier q = (Querier) runningQueriers.get(qid);
153 if (q != null) return q;
154
155 q = (Querier) queuedQueriers.get(qid);
156 if (q != null) return q;
157
158 q = (Querier) closedQueriers.get(qid);
159 return q;
160 }
161
162 /*** Returns a list of all the queued (initialised but not started) queriers - including
163 * those that are on the active queue and those just initialised waiting on some external push
164 */
165 public QuerierStatus[] getQueued() {
166 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
167 Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
168 QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
169 for (int i = 0; i < queued.length; i++) {
170 statuses[i] = queued[i].getStatus();
171 }
172 for (int j = 0; j < initialised.length; j++) {
173 statuses[j+queued.length] = initialised[j].getStatus();
174 }
175 return statuses;
176 }
177
178 /*** Returns a list of all the currently running queriers
179 */
180 public QuerierStatus[] getRunning() {
181 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
182 QuerierStatus[] statuses = new QuerierStatus[running.length];
183 for (int i = 0; i < running.length; i++) {
184 statuses[i] = running[i].getStatus();
185 }
186 return statuses;
187 }
188
189 /*** Returns a list of all the ids of the currently running queriers
190 */
191 public QuerierStatus[] getClosed() {
192 Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
193 QuerierStatus[] statuses = new QuerierStatus[closed.length];
194 for (int i = 0; i < closed.length; i++) {
195 statuses[i] = closed[i].getStatus();
196 }
197 return statuses;
198 }
199
200 /*** Returns the status's of all the queriers in date/time order */
201 public QuerierStatus[] getAllStatus() {
202 Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
203 Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
204 Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
205
206 TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
207 for (int i = 0; i < queued.length; i++) {
208 statuses.add(queued[i].getStatus());
209 }
210 for (int i = 0; i < running.length; i++) {
211 statuses.add(running[i].getStatus());
212 }
213 for (int i = 0; i < ran.length; i++) {
214 statuses.add(ran[i].getStatus());
215 }
216 return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
217 }
218
219 /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
220 public void holdQuerier(Querier querier) {
221 heldQueriers.put(querier.getId(), querier);
222 }
223
224 /***
225 * Adds the given querier to this manager, and starts it off on a new
226 * thread. asynchronous.
227 */
228 public void submitQuerier(Querier querier) {
229
230
231 if (heldQueriers.get(querier.getId()) != null) {
232 heldQueriers.remove(querier.getId());
233 }
234
235
236 if (runningQueriers.get(querier.getId()) != null) {
237 log.error( "Handle '" + querier.getId() + "' already in use");
238 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
239 }
240 querier.setStatus(new QuerierQueued(querier.getStatus()));
241 queuedQueriers.put(querier.getId(), querier);
242 queuedPriorities.add(querier);
243 querier.addListener(this);
244
245 checkQueue();
246 }
247
248 /***
249 * Adds the given querier to this manager, runs it, and returns the status;
250 * synchronous (blocking); not queued
251 */
252 public QuerierStatus askQuerier(Querier querier) throws IOException {
253
254
255 if (runningQueriers.get(querier.getId()) != null) {
256 log.error( "Handle '" + querier.getId() + "' already in use");
257 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
258 }
259 runningQueriers.put(querier.getId(), querier);
260 querier.addListener(this);
261 querier.ask();
262 return querier.getStatus();
263 }
264
265 /*** Adds the given querier to this manager, and asks the querier for the
266 * count (number of matches). Synchronous, returning the number of matches
267 */
268 public long askCount(Querier querier) throws IOException {
269
270
271 if (runningQueriers.get(querier.getId()) != null) {
272 log.error( "Handle '" + querier.getId() + "' already in use");
273 throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
274 }
275 runningQueriers.put(querier.getId(), querier);
276 querier.addListener(this);
277 return querier.askCount();
278 }
279
280
281 /*** A Querier manager must listen to it's queriers
282 */
283 public void queryStatusChanged(Querier querier) {
284
285
286 if (querier.getStatus().isFinished()) {
287 runningQueriers.remove(querier.getId());
288 queuedQueriers.remove(querier.getId());
289 closedQueriers.put(querier.getId(), querier);
290 checkQueue();
291 }
292 }
293
294 /*** Checks the queue - if there are queued queriers and not too many
295 * running, moves a queued one and starts it
296 */
297 protected synchronized void checkQueue() {
298
299 System.gc();
300
301
302
303
304
305
306
307 while ((queuedQueriers.size()>0) &&
308 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
309
310 Querier first = (Querier) queuedPriorities.first();
311 queuedPriorities.remove(first);
312 queuedQueriers.remove(first.getId());
313 runningQueriers.put(first.getId(), first);
314
315 Thread qth = new Thread(first);
316 qth.start();
317 }
318 }
319
320
321 /*** Shut down - abort all running queries */
322 public void shutDown() {
323
324 queuedQueriers.clear();
325 queuedPriorities.clear();
326
327 QuerierStatus[] running = getRunning();
328 for (int i = 0; i < running.length; i++) {
329 Querier q = getQuerier(running[i].getId());
330 try {
331 q.abort();
332 }
333 catch (Throwable th) {
334
335 }
336 }
337 }
338
339
340
341
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511