View Javadoc

1   /*$Id: QuerierManager.java,v 1.1.1.1 2005/02/17 18:37:35 mch Exp $
2    * Created on 24-Sep-2003
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid
7    * Software License version 1.2, a copy of which has been included
8    * with this distribution in the LICENSE.txt file.
9    *
10   **/
11  package org.astrogrid.dataservice.queriers;
12  
13  import java.io.IOException;
14  import java.util.Comparator;
15  import java.util.Hashtable;
16  import java.util.TreeSet;
17  import org.apache.commons.logging.Log;
18  import org.apache.commons.logging.LogFactory;
19  import org.astrogrid.dataservice.queriers.status.QuerierQueued;
20  import org.astrogrid.dataservice.queriers.status.QuerierStatus;
21  import org.astrogrid.dataservice.queriers.status.StatusLogger;
22  import org.astrogrid.status.TaskStatus;
23  
24  /*** Manages the construction and initialization of Queriers, and maintains a
25   * collection of current Queriers. It might run queues or something... later...
26   *
27   * <p>
28   * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
29   * @author mch
30   */
31  public class QuerierManager implements QuerierListener {
32     
33     
34     private static final Log log = LogFactory.getLog(QuerierManager.class);
35     
36     /*** Identifier for this manager/querier container */
37     private String managerId;
38     
39     /*** List of managers */
40     private static Hashtable managers = new Hashtable();
41     
42     /*** Lookup table of initialised queriers.  These are queriers that have been
43      * created but are yet 'complete', or have not yet been told to run.  For
44      * example, the CEA architecture has an 'init' which creates a query, and a
45      * 'execute' which will kick it to the queue
46      */
47     private Hashtable heldQueriers = new Hashtable();
48     
49     /*** lookup table of queued queriers.  These are queriers that are waiting on
50      a 'free' spot on the running queriers - ie when the running queriers have hit
51      * the maximum limit and the */
52     private Hashtable queuedQueriers = new Hashtable();
53     
54     /*** priority index of queued queriers */
55     private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
56  
57     /*** lookup table of all the current queriers indexed by their handle*/
58     private Hashtable runningQueriers = new Hashtable();
59  
60     /*** lookup table of old queriers */
61     private Hashtable closedQueriers = new Hashtable();
62  
63     /*** Maximum number of simultaneous queriers allowed */
64     private int maxQueriers = 20;
65     
66     /*** Special ID used to create a test querier for testing getStatus,. etc */
67     public final static String TEST_QUERIER_ID = "TestQuerier:";
68     
69     /*** PriorityComparitor for the queue */
70     protected class QuerierStartTimeComparator implements Comparator {
71        
72        /***
73         * Compares its two arguments for order.  Returns a negative integer,
74         * zero, or a positive integer as the first argument is less than, equal
75         * to, or greater than the second.<p>
76         *
77         * @param o1 the first object to be compared.
78         * @param o2 the second object to be compared.
79         * @return a negative integer, zero, or a positive integer as the
80         *           first argument is less than, equal to, or greater than the
81         *        second.
82         * @throws ClassCastException if the arguments' types prevent them from
83         *           being compared by this Comparator.
84         */
85        public int compare(Object o1, Object o2) {
86           Querier q1 = (Querier) o1;
87           Querier q2 = (Querier) o2;
88           if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
89              return -1;
90           }
91           else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
92              return 1;
93           }
94           else {
95              return 0;
96           }
97        }
98        
99     }
100    
101    /*** Status Comparitor for displays */
102    protected class StatusStartTimeComparator implements Comparator {
103       
104       /***
105        * Compares its two arguments for order.  Returns a negative integer,
106        * zero, or a positive integer as the first argument is less than, equal
107        * to, or greater than the second.<p>
108        *
109        * @param o1 the first object to be compared.
110        * @param o2 the second object to be compared.
111        * @return a negative integer, zero, or a positive integer as the
112        *           first argument is less than, equal to, or greater than the
113        *        second.
114        * @throws ClassCastException if the arguments' types prevent them from
115        *           being compared by this Comparator.
116        */
117       public int compare(Object o1, Object o2) {
118          QuerierStatus q1 = (QuerierStatus) o1;
119          QuerierStatus q2 = (QuerierStatus) o2;
120          if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
121             return 1;
122          }
123          else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
124             return -1;
125          }
126          else {
127             return 0;
128          }
129       }
130       
131    }
132 
133    /*** Constructor. Protected because we want to force people to use the factory method   */
134    protected QuerierManager(String givenId) {
135       this.managerId = givenId;
136    }
137    
138    /*** Factory method - checks to see if the givenId already exists and returns that if so */
139    public synchronized static QuerierManager getManager(String givenId) {
140       if (managers.get(givenId) != null) {
141          return (QuerierManager) managers.get(givenId);
142       }
143       else {
144          QuerierManager manager = new QuerierManager(givenId);
145          managers.put(givenId, manager);
146          return manager;
147       }
148    }
149 
150    /*** Return the querier with the given id */
151    public Querier getQuerier(String qid) {
152 
153       Querier q = (Querier) runningQueriers.get(qid);
154       if (q != null) return q;
155       
156       q = (Querier) queuedQueriers.get(qid);
157       if (q != null) return q;
158 
159       q = (Querier) closedQueriers.get(qid);
160       return q;
161    }
162    
163    /*** Returns a list of all the queued (initialised but not started) queriers - including
164     * those that are on the active queue and those just initialised waiting on some external push
165     */
166    public QuerierStatus[] getQueued() {
167       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
168       Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
169       QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
170       for (int i = 0; i < queued.length; i++) {
171          statuses[i] = queued[i].getStatus();
172       }
173       for (int j = 0; j < initialised.length; j++) {
174          statuses[j+queued.length] = initialised[j].getStatus();
175       }
176       return statuses;
177    }
178 
179    /*** Returns a list of all the currently running queriers
180     */
181    public QuerierStatus[] getRunning() {
182       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
183       QuerierStatus[] statuses = new QuerierStatus[running.length];
184       for (int i = 0; i < running.length; i++) {
185          statuses[i] = running[i].getStatus();
186       }
187       return statuses;
188    }
189    
190    /*** Returns a list of all the ids of the currently running queriers
191     */
192    public QuerierStatus[] getClosed() {
193       Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
194       QuerierStatus[] statuses = new QuerierStatus[closed.length];
195       for (int i = 0; i < closed.length; i++) {
196          statuses[i] = closed[i].getStatus();
197       }
198       return statuses;
199    }
200    
201    /*** Returns the status's of all the queriers in date/time order */
202    public QuerierStatus[] getAllStatus() {
203       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
204       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
205       Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
206       
207       TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
208       for (int i = 0; i < queued.length; i++) {
209          statuses.add(queued[i].getStatus());
210       }
211       for (int i = 0; i < running.length; i++) {
212          statuses.add(running[i].getStatus());
213       }
214       for (int i = 0; i < ran.length; i++) {
215          statuses.add(ran[i].getStatus());
216       }
217       return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
218    }
219    
220    /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
221    public void holdQuerier(Querier querier) {
222       heldQueriers.put(querier.getId(), querier);
223    }
224    
225    /***
226     * Adds the given querier to this manager, and starts it off on a new
227     * thread.  asynchronous.
228     */
229    public void submitQuerier(Querier querier)  {
230 
231       //see if it's in holding queue
232       if (heldQueriers.get(querier.getId()) != null) {
233          heldQueriers.remove(querier.getId());
234       }
235       
236       //assigns handle
237       if (runningQueriers.get(querier.getId()) != null) {
238          log.error( "Handle '" + querier.getId() + "' already in use");
239          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
240       }
241       querier.setStatus(new QuerierQueued(querier.getStatus()));
242       queuedQueriers.put(querier.getId(), querier);
243       queuedPriorities.add(querier);
244       querier.addListener(this);
245       
246       checkQueue();
247    }
248 
249    /***
250     * Adds the given querier to this manager, runs it, and returns the status;
251     * synchronous (blocking); not queued
252     */
253    public QuerierStatus askQuerier(Querier querier)  throws IOException {
254       
255       //assigns handle
256       if (runningQueriers.get(querier.getId()) != null) {
257          log.error( "Handle '" + querier.getId() + "' already in use");
258          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
259       }
260       runningQueriers.put(querier.getId(), querier);
261       querier.addListener(this);
262       querier.ask();
263       return querier.getStatus();
264    }
265 
266    /*** Adds the given querier to this manager, and asks the querier for the
267     * count (number of matches).  Synchronous, returning the number of matches
268     */
269    public long askCount(Querier querier)  throws IOException {
270       
271       //assigns handle
272       if (runningQueriers.get(querier.getId()) != null) {
273          log.error( "Handle '" + querier.getId() + "' already in use");
274          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
275       }
276       runningQueriers.put(querier.getId(), querier);
277       querier.addListener(this);
278       return querier.askCount();
279    }
280    
281    
282    /*** A Querier manager must listen to it's queriers
283     */
284    public void queryStatusChanged(Querier querier) {
285 
286       //if it's changed to closed, then move to closed list
287       if (querier.getStatus().isFinished()) {
288          runningQueriers.remove(querier.getId()); //remove if it's in running
289          queuedQueriers.remove(querier.getId()); //remove if it's queued
290          closedQueriers.put(querier.getId(), querier); //make sure it's in closed
291          checkQueue(); //see if, if having removed it from running, we ought to start another
292       }
293    }
294    
295    /*** Checks the queue - if there are queued queriers and not too many
296     * running, moves a queued one and starts it
297     */
298    protected synchronized void checkQueue() {
299 
300       System.gc(); //encourage garbage collection
301 
302       //have a look at the memory; if it's 'low' then reduce to one query
303       //at a time
304 //      if (Runtime.getRuntime().freeMemory()<500000) {
305 //         maxQueriers = 1;
306 //      }
307 
308       while ((queuedQueriers.size()>0) &&
309                 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
310          
311          Querier first = (Querier) queuedPriorities.first();
312          queuedPriorities.remove(first);
313          queuedQueriers.remove(first.getId());
314          runningQueriers.put(first.getId(), first);
315          
316          Thread qth = new Thread(first);
317          qth.start();
318       }
319    }
320    
321    
322    /*** Shut down - abort all running queries */
323    public void shutDown() {
324       //so no new ones start while we shut down
325       queuedQueriers.clear();
326       queuedPriorities.clear();
327       
328       QuerierStatus[] running = getRunning();
329       for (int i = 0; i < running.length; i++) {
330          Querier q = getQuerier(running[i].getId());
331          try {
332             q.abort();
333          }
334          catch (Throwable th) {
335             //ignore
336          }
337       }
338    }
339    
340    
341    
342    
343 }
344 
345 /*
346  $Log: QuerierManager.java,v $
347  Revision 1.1.1.1  2005/02/17 18:37:35  mch
348  Initial checkin
349 
350  Revision 1.1.1.1  2005/02/16 17:11:24  mch
351  Initial checkin
352 
353  Revision 1.13.6.2  2004/11/25 18:33:43  mch
354  more status (incl persisting) more tablewriting lots of fixes
355 
356  Revision 1.13.6.1  2004/11/22 00:57:16  mch
357  New interfaces for SIAP etc and new slinger package
358 
359  Revision 1.13  2004/11/10 22:01:50  mch
360  skynode starts and some fixes
361 
362  Revision 1.12  2004/11/09 18:27:21  mch
363  added askCount
364 
365  Revision 1.11  2004/11/08 02:59:45  mch
366  Added held queriers
367 
368  Revision 1.10  2004/11/03 00:17:56  mch
369  PAL_MCH Candidate 2 merge
370 
371  Revision 1.6.10.2  2004/10/27 00:43:39  mch
372  Started adding getCount, some resource fixes, some jsps
373 
374  Revision 1.6.10.1  2004/10/20 19:42:03  mch
375  Added context listener and initalisation code
376 
377  Revision 1.6  2004/10/05 19:20:32  mch
378  Queuing order
379 
380  Revision 1.5  2004/10/05 17:31:14  mch
381  Fix to wrong class cast in comparator
382 
383  Revision 1.4  2004/10/05 15:20:03  mch
384  Added starttime sort to getStatus
385 
386  Revision 1.3  2004/10/05 14:57:10  mch
387  Added queued
388 
389  Revision 1.2  2004/10/01 18:04:58  mch
390  Some factoring out of status stuff, added monitor page
391 
392  Revision 1.1  2004/09/28 15:02:13  mch
393  Merged PAL and server packages
394 
395  Revision 1.25  2004/09/28 11:45:21  mch
396  Removed thread pooling :-)
397 
398  Revision 1.24  2004/09/17 01:26:12  nw
399  altered querier manager to use a threadpool
400 
401  Revision 1.23  2004/03/15 19:16:12  mch
402  Lots of fixes to status updates
403 
404  Revision 1.22  2004/03/15 17:50:57  mch
405  Added 'closed' querier queue and more published status information
406 
407  Revision 1.21  2004/03/13 23:38:46  mch
408  Test fixes and better front-end JSP access
409 
410  Revision 1.20  2004/03/12 04:45:26  mch
411  It05 MCH Refactor
412 
413  Revision 1.19  2004/03/08 15:57:42  mch
414  Fixes to ensure old ADQL interface works alongside new one and with old plugins
415 
416  Revision 1.18  2004/03/08 00:31:28  mch
417  Split out webservice implementations for versioning
418 
419  Revision 1.17  2004/03/07 00:33:50  mch
420  Started to separate It4.1 interface from general server services
421 
422  Revision 1.16  2004/02/24 19:13:47  mch
423  Added logging info trace
424 
425  Revision 1.15  2004/02/24 16:04:18  mch
426  Config refactoring and moved datacenter It04.1 VoSpaceStuff to myspace StoreStuff
427 
428  Revision 1.14  2004/02/17 03:38:05  mch
429  Various fixes for demo
430 
431  Revision 1.13  2004/02/16 23:34:35  mch
432  Changed to use Principal and AttomConfig
433 
434  Revision 1.12  2004/01/15 17:38:25  nw
435  adjusted how queriers close() themselves - altered so it
436  works no matter if Querier.close() or QuerierManager.closeQuerier(q)
437  is called.
438 
439  Revision 1.11  2004/01/14 17:57:32  nw
440  improved documentation
441 
442  Revision 1.10  2004/01/13 00:33:14  nw
443  Merged in branch providing
444  * sql pass-through
445  * replace Certification by User
446  * Rename _query as Query
447 
448  Revision 1.9.6.2  2004/01/08 09:43:41  nw
449  replaced adql front end with a generalized front end that accepts
450  a range of query languages (pass-thru sql at the moment)
451 
452  Revision 1.9.6.1  2004/01/07 11:51:07  nw
453  found out how to get wsdl to generate nice java class names.
454  Replaced _query with Query throughout sources.
455 
456  Revision 1.9  2003/12/03 19:37:03  mch
457  Introduced DirectDelegate, fixed DummyQuerier
458 
459  Revision 1.8  2003/12/03 12:47:44  mch
460  Better error reportiong for failed Querier instantiations
461 
462  Revision 1.7  2003/12/01 20:57:39  mch
463  Abstracting coarse-grained plugin
464 
465  Revision 1.6  2003/12/01 16:43:52  nw
466  dropped QueryId, back to string
467 
468  Revision 1.5  2003/11/28 16:10:30  nw
469  finished plugin-rewrite.
470  added tests to cover plugin system.
471  cleaned up querier & queriermanager. tested
472 
473  Revision 1.4  2003/11/27 17:28:09  nw
474  finished plugin-refactoring
475 
476  Revision 1.3  2003/11/27 00:52:58  nw
477  refactored to introduce plugin-back end and translator maps.
478  interfaces in place. still broken code in places.
479 
480  Revision 1.2  2003/11/25 18:50:06  mch
481  Abstracted Querier from DatabaseQuerier
482 
483  Revision 1.1  2003/11/25 14:17:24  mch
484  Extracting Querier from DatabaseQuerier to handle non-database backends
485 
486  Revision 1.5  2003/11/25 11:57:56  mch
487  Added framework for SQL-passthrough queries
488 
489  Revision 1.4  2003/11/21 17:37:56  nw
490  made a start tidying up the server.
491  reduced the number of failing tests
492  found commented out code
493 
494  Revision 1.3  2003/11/18 11:10:16  mch
495  Removed client dependencies on server
496 
497  Revision 1.2  2003/11/17 15:41:48  mch
498  Package movements
499 
500  Revision 1.1  2003/11/14 00:38:29  mch
501  Code restructure
502 
503  Revision 1.5  2003/11/05 18:57:26  mch
504  Build fixes for change to SOAPy Beans and new delegates
505 
506  Revision 1.4  2003/10/06 18:56:27  mch
507  Naughtily large set of changes converting to SOAPy bean/interface-based delegates
508 
509  Revision 1.3  2003/09/26 11:38:00  nw
510  improved documentation, fixed imports
511 
512  Revision 1.2  2003/09/25 01:23:28  nw
513  altered visibility on generateHandle() so it can be used within DummyQuerier
514 
515  Revision 1.1  2003/09/24 21:02:45  nw
516  Factored creation and management of database queriers
517  into separate class. Simplifies DatabaseQuerier.
518 
519  Database Querier - added calls to timer, untagled status transitions
520  
521  */
522 
523 
524