View Javadoc

1   /*$Id: QuerierManager.java,v 1.13 2004/11/10 22:01:50 mch Exp $
2    * Created on 24-Sep-2003
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid
7    * Software License version 1.2, a copy of which has been included
8    * with this distribution in the LICENSE.txt file.
9    *
10   **/
11  package org.astrogrid.datacenter.queriers;
12  
13  import java.io.IOException;
14  import java.util.Comparator;
15  import java.util.Hashtable;
16  import java.util.TreeSet;
17  import org.apache.commons.logging.Log;
18  import org.apache.commons.logging.LogFactory;
19  import org.astrogrid.datacenter.queriers.status.QuerierClosed;
20  import org.astrogrid.datacenter.queriers.status.QuerierQueued;
21  import org.astrogrid.datacenter.queriers.status.QuerierStatus;
22  
23  /*** Manages the construction and initialization of Queriers, and maintains a
24   * collection of current Queriers. It might run queues or something... later...
25   *
26   * <p>
27   * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
28   * @author mch
29   */
30  public class QuerierManager implements QuerierListener {
31     
32     
33     private static final Log log = LogFactory.getLog(QuerierManager.class);
34     
35     /*** Identifier for this manager/querier container */
36     private String managerId;
37     
38     /*** List of managers */
39     private static Hashtable managers = new Hashtable();
40     
41     /*** Lookup table of initialised queriers.  These are queriers that have been
42      * created but are yet 'complete', or have not yet been told to run.  For
43      * example, the CEA architecture has an 'init' which creates a query, and a
44      * 'execute' which will kick it to the queue
45      */
46     private Hashtable heldQueriers = new Hashtable();
47     
48     /*** lookup table of queued queriers.  These are queriers that are waiting on
49      a 'free' spot on the running queriers - ie when the running queriers have hit
50      * the maximum limit and the */
51     private Hashtable queuedQueriers = new Hashtable();
52     
53     /*** priority index of queued queriers */
54     private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
55  
56     /*** lookup table of all the current queriers indexed by their handle*/
57     private Hashtable runningQueriers = new Hashtable();
58  
59     /*** lookup table of old queriers */
60     private Hashtable closedQueriers = new Hashtable();
61  
62     /*** Maximum number of simultaneous queriers allowed */
63     private int maxQueriers = 20;
64     
65     /*** Special ID used to create a test querier for testing getStatus,. etc */
66     public final static String TEST_QUERIER_ID = "TestQuerier:";
67     
68     /*** PriorityComparitor for the queue */
69     protected class QuerierStartTimeComparator implements Comparator {
70        
71        /***
72         * Compares its two arguments for order.  Returns a negative integer,
73         * zero, or a positive integer as the first argument is less than, equal
74         * to, or greater than the second.<p>
75         *
76         * @param o1 the first object to be compared.
77         * @param o2 the second object to be compared.
78         * @return a negative integer, zero, or a positive integer as the
79         *           first argument is less than, equal to, or greater than the
80         *        second.
81         * @throws ClassCastException if the arguments' types prevent them from
82         *           being compared by this Comparator.
83         */
84        public int compare(Object o1, Object o2) {
85           Querier q1 = (Querier) o1;
86           Querier q2 = (Querier) o2;
87           if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
88              return -1;
89           }
90           else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
91              return 1;
92           }
93           else {
94              return 0;
95           }
96        }
97        
98     }
99     
100    /*** Status Comparitor for displays */
101    protected class StatusStartTimeComparator implements Comparator {
102       
103       /***
104        * Compares its two arguments for order.  Returns a negative integer,
105        * zero, or a positive integer as the first argument is less than, equal
106        * to, or greater than the second.<p>
107        *
108        * @param o1 the first object to be compared.
109        * @param o2 the second object to be compared.
110        * @return a negative integer, zero, or a positive integer as the
111        *           first argument is less than, equal to, or greater than the
112        *        second.
113        * @throws ClassCastException if the arguments' types prevent them from
114        *           being compared by this Comparator.
115        */
116       public int compare(Object o1, Object o2) {
117          QuerierStatus q1 = (QuerierStatus) o1;
118          QuerierStatus q2 = (QuerierStatus) o2;
119          if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
120             return 1;
121          }
122          else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
123             return -1;
124          }
125          else {
126             return 0;
127          }
128       }
129       
130    }
131 
132    /*** Constructor. Protected because we want to force people to use the factory method   */
133    protected QuerierManager(String givenId) {
134       this.managerId = givenId;
135    }
136    
137    /*** Factory method - checks to see if the givenId already exists and returns that if so */
138    public synchronized static QuerierManager getManager(String givenId) {
139       if (managers.get(givenId) != null) {
140          return (QuerierManager) managers.get(givenId);
141       }
142       else {
143          QuerierManager manager = new QuerierManager(givenId);
144          managers.put(givenId, manager);
145          return manager;
146       }
147    }
148 
149    /*** Return the querier with the given id */
150    public Querier getQuerier(String qid) {
151 
152       Querier q = (Querier) runningQueriers.get(qid);
153       if (q != null) return q;
154       
155       q = (Querier) queuedQueriers.get(qid);
156       if (q != null) return q;
157 
158       q = (Querier) closedQueriers.get(qid);
159       return q;
160    }
161    
162    /*** Returns a list of all the queued (initialised but not started) queriers - including
163     * those that are on the active queue and those just initialised waiting on some external push
164     */
165    public QuerierStatus[] getQueued() {
166       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
167       Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
168       QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
169       for (int i = 0; i < queued.length; i++) {
170          statuses[i] = queued[i].getStatus();
171       }
172       for (int j = 0; j < initialised.length; j++) {
173          statuses[j+queued.length] = initialised[j].getStatus();
174       }
175       return statuses;
176    }
177 
178    /*** Returns a list of all the currently running queriers
179     */
180    public QuerierStatus[] getRunning() {
181       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
182       QuerierStatus[] statuses = new QuerierStatus[running.length];
183       for (int i = 0; i < running.length; i++) {
184          statuses[i] = running[i].getStatus();
185       }
186       return statuses;
187    }
188    
189    /*** Returns a list of all the ids of the currently running queriers
190     */
191    public QuerierStatus[] getClosed() {
192       Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
193       QuerierStatus[] statuses = new QuerierStatus[closed.length];
194       for (int i = 0; i < closed.length; i++) {
195          statuses[i] = closed[i].getStatus();
196       }
197       return statuses;
198    }
199    
200    /*** Returns the status's of all the queriers in date/time order */
201    public QuerierStatus[] getAllStatus() {
202       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
203       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
204       Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
205 
206       TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
207       for (int i = 0; i < queued.length; i++) {
208          statuses.add(queued[i].getStatus());
209       }
210       for (int i = 0; i < running.length; i++) {
211          statuses.add(running[i].getStatus());
212       }
213       for (int i = 0; i < ran.length; i++) {
214          statuses.add(ran[i].getStatus());
215       }
216       return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
217    }
218    
219    /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
220    public void holdQuerier(Querier querier) {
221       heldQueriers.put(querier.getId(), querier);
222    }
223    
224    /***
225     * Adds the given querier to this manager, and starts it off on a new
226     * thread.  asynchronous.
227     */
228    public void submitQuerier(Querier querier)  {
229 
230       //see if it's in holding queue
231       if (heldQueriers.get(querier.getId()) != null) {
232          heldQueriers.remove(querier.getId());
233       }
234       
235       //assigns handle
236       if (runningQueriers.get(querier.getId()) != null) {
237          log.error( "Handle '" + querier.getId() + "' already in use");
238          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
239       }
240       querier.setStatus(new QuerierQueued(querier.getStatus()));
241       queuedQueriers.put(querier.getId(), querier);
242       queuedPriorities.add(querier);
243       querier.addListener(this);
244       
245       checkQueue();
246    }
247 
248    /***
249     * Adds the given querier to this manager, runs it, and returns the status;
250     * synchronous (blocking); not queued
251     */
252    public QuerierStatus askQuerier(Querier querier)  throws IOException {
253       
254       //assigns handle
255       if (runningQueriers.get(querier.getId()) != null) {
256          log.error( "Handle '" + querier.getId() + "' already in use");
257          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
258       }
259       runningQueriers.put(querier.getId(), querier);
260       querier.addListener(this);
261       querier.ask();
262       return querier.getStatus();
263    }
264 
265    /*** Adds the given querier to this manager, and asks the querier for the
266     * count (number of matches).  Synchronous, returning the number of matches
267     */
268    public long askCount(Querier querier)  throws IOException {
269       
270       //assigns handle
271       if (runningQueriers.get(querier.getId()) != null) {
272          log.error( "Handle '" + querier.getId() + "' already in use");
273          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
274       }
275       runningQueriers.put(querier.getId(), querier);
276       querier.addListener(this);
277       return querier.askCount();
278    }
279    
280    
281    /*** A Querier manager must listen to it's queriers
282     */
283    public void queryStatusChanged(Querier querier) {
284 
285       //if it's changed to closed, then move to closed list
286       if (querier.getStatus().isFinished()) {
287          runningQueriers.remove(querier.getId()); //remove if it's in running
288          queuedQueriers.remove(querier.getId()); //remove if it's queued
289          closedQueriers.put(querier.getId(), querier); //make sure it's in closed
290          checkQueue(); //see if, if having removed it from running, we ought to start another
291       }
292    }
293    
294    /*** Checks the queue - if there are queued queriers and not too many
295     * running, moves a queued one and starts it
296     */
297    protected synchronized void checkQueue() {
298 
299       System.gc(); //encourage garbage collection
300 
301       //have a look at the memory; if it's 'low' then reduce to one query
302       //at a time
303 //      if (Runtime.getRuntime().freeMemory()<500000) {
304 //         maxQueriers = 1;
305 //      }
306 
307       while ((queuedQueriers.size()>0) &&
308                 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
309          
310          Querier first = (Querier) queuedPriorities.first();
311          queuedPriorities.remove(first);
312          queuedQueriers.remove(first.getId());
313          runningQueriers.put(first.getId(), first);
314          
315          Thread qth = new Thread(first);
316          qth.start();
317       }
318    }
319    
320    
321    /*** Shut down - abort all running queries */
322    public void shutDown() {
323       //so no new ones start while we shut down
324       queuedQueriers.clear();
325       queuedPriorities.clear();
326       
327       QuerierStatus[] running = getRunning();
328       for (int i = 0; i < running.length; i++) {
329          Querier q = getQuerier(running[i].getId());
330          try {
331             q.abort();
332          }
333          catch (Throwable th) {
334             //ignore
335          }
336       }
337    }
338    
339    
340    
341    
342 }
343 
344 /*
345  $Log: QuerierManager.java,v $
346  Revision 1.13  2004/11/10 22:01:50  mch
347  skynode starts and some fixes
348 
349  Revision 1.12  2004/11/09 18:27:21  mch
350  added askCount
351 
352  Revision 1.11  2004/11/08 02:59:45  mch
353  Added held queriers
354 
355  Revision 1.10  2004/11/03 00:17:56  mch
356  PAL_MCH Candidate 2 merge
357 
358  Revision 1.6.10.2  2004/10/27 00:43:39  mch
359  Started adding getCount, some resource fixes, some jsps
360 
361  Revision 1.6.10.1  2004/10/20 19:42:03  mch
362  Added context listener and initalisation code
363 
364  Revision 1.6  2004/10/05 19:20:32  mch
365  Queuing order
366 
367  Revision 1.5  2004/10/05 17:31:14  mch
368  Fix to wrong class cast in comparator
369 
370  Revision 1.4  2004/10/05 15:20:03  mch
371  Added starttime sort to getStatus
372 
373  Revision 1.3  2004/10/05 14:57:10  mch
374  Added queued
375 
376  Revision 1.2  2004/10/01 18:04:58  mch
377  Some factoring out of status stuff, added monitor page
378 
379  Revision 1.1  2004/09/28 15:02:13  mch
380  Merged PAL and server packages
381 
382  Revision 1.25  2004/09/28 11:45:21  mch
383  Removed thread pooling :-)
384 
385  Revision 1.24  2004/09/17 01:26:12  nw
386  altered querier manager to use a threadpool
387 
388  Revision 1.23  2004/03/15 19:16:12  mch
389  Lots of fixes to status updates
390 
391  Revision 1.22  2004/03/15 17:50:57  mch
392  Added 'closed' querier queue and more published status information
393 
394  Revision 1.21  2004/03/13 23:38:46  mch
395  Test fixes and better front-end JSP access
396 
397  Revision 1.20  2004/03/12 04:45:26  mch
398  It05 MCH Refactor
399 
400  Revision 1.19  2004/03/08 15:57:42  mch
401  Fixes to ensure old ADQL interface works alongside new one and with old plugins
402 
403  Revision 1.18  2004/03/08 00:31:28  mch
404  Split out webservice implementations for versioning
405 
406  Revision 1.17  2004/03/07 00:33:50  mch
407  Started to separate It4.1 interface from general server services
408 
409  Revision 1.16  2004/02/24 19:13:47  mch
410  Added logging info trace
411 
412  Revision 1.15  2004/02/24 16:04:18  mch
413  Config refactoring and moved datacenter It04.1 VoSpaceStuff to myspace StoreStuff
414 
415  Revision 1.14  2004/02/17 03:38:05  mch
416  Various fixes for demo
417 
418  Revision 1.13  2004/02/16 23:34:35  mch
419  Changed to use Account and AttomConfig
420 
421  Revision 1.12  2004/01/15 17:38:25  nw
422  adjusted how queriers close() themselves - altered so it
423  works no matter if Querier.close() or QuerierManager.closeQuerier(q)
424  is called.
425 
426  Revision 1.11  2004/01/14 17:57:32  nw
427  improved documentation
428 
429  Revision 1.10  2004/01/13 00:33:14  nw
430  Merged in branch providing
431  * sql pass-through
432  * replace Certification by User
433  * Rename _query as Query
434 
435  Revision 1.9.6.2  2004/01/08 09:43:41  nw
436  replaced adql front end with a generalized front end that accepts
437  a range of query languages (pass-thru sql at the moment)
438 
439  Revision 1.9.6.1  2004/01/07 11:51:07  nw
440  found out how to get wsdl to generate nice java class names.
441  Replaced _query with Query throughout sources.
442 
443  Revision 1.9  2003/12/03 19:37:03  mch
444  Introduced DirectDelegate, fixed DummyQuerier
445 
446  Revision 1.8  2003/12/03 12:47:44  mch
447  Better error reportiong for failed Querier instantiations
448 
449  Revision 1.7  2003/12/01 20:57:39  mch
450  Abstracting coarse-grained plugin
451 
452  Revision 1.6  2003/12/01 16:43:52  nw
453  dropped QueryId, back to string
454 
455  Revision 1.5  2003/11/28 16:10:30  nw
456  finished plugin-rewrite.
457  added tests to cover plugin system.
458  cleaned up querier & queriermanager. tested
459 
460  Revision 1.4  2003/11/27 17:28:09  nw
461  finished plugin-refactoring
462 
463  Revision 1.3  2003/11/27 00:52:58  nw
464  refactored to introduce plugin-back end and translator maps.
465  interfaces in place. still broken code in places.
466 
467  Revision 1.2  2003/11/25 18:50:06  mch
468  Abstracted Querier from DatabaseQuerier
469 
470  Revision 1.1  2003/11/25 14:17:24  mch
471  Extracting Querier from DatabaseQuerier to handle non-database backends
472 
473  Revision 1.5  2003/11/25 11:57:56  mch
474  Added framework for SQL-passthrough queries
475 
476  Revision 1.4  2003/11/21 17:37:56  nw
477  made a start tidying up the server.
478  reduced the number of failing tests
479  found commented out code
480 
481  Revision 1.3  2003/11/18 11:10:16  mch
482  Removed client dependencies on server
483 
484  Revision 1.2  2003/11/17 15:41:48  mch
485  Package movements
486 
487  Revision 1.1  2003/11/14 00:38:29  mch
488  Code restructure
489 
490  Revision 1.5  2003/11/05 18:57:26  mch
491  Build fixes for change to SOAPy Beans and new delegates
492 
493  Revision 1.4  2003/10/06 18:56:27  mch
494  Naughtily large set of changes converting to SOAPy bean/interface-based delegates
495 
496  Revision 1.3  2003/09/26 11:38:00  nw
497  improved documentation, fixed imports
498 
499  Revision 1.2  2003/09/25 01:23:28  nw
500  altered visibility on generateHandle() so it can be used within DummyQuerier
501 
502  Revision 1.1  2003/09/24 21:02:45  nw
503  Factored creation and management of database queriers
504  into separate class. Simplifies DatabaseQuerier.
505 
506  Database Querier - added calls to timer, untagled status transitions
507  
508  */
509 
510 
511