View Javadoc

1   /*$Id: QuerierManager.java,v 1.2 2005/05/27 16:21:02 clq2 Exp $
2    * Created on 24-Sep-2003
3    *
4    * Copyright (C) AstroGrid. All rights reserved.
5    *
6    * This software is published under the terms of the AstroGrid
7    * Software License version 1.2, a copy of which has been included
8    * with this distribution in the LICENSE.txt file.
9    *
10   **/
11  package org.astrogrid.dataservice.queriers;
12  
13  import java.io.IOException;
14  import java.util.Comparator;
15  import java.util.Hashtable;
16  import java.util.TreeSet;
17  import org.apache.commons.logging.Log;
18  import org.apache.commons.logging.LogFactory;
19  import org.astrogrid.cfg.ConfigFactory;
20  import org.astrogrid.dataservice.queriers.status.QuerierQueued;
21  import org.astrogrid.dataservice.queriers.status.QuerierStatus;
22  
23  /*** Manages the construction and initialization of Queriers, and maintains a
24   * collection of current Queriers. It might run queues or something... later...
25   *
26   * <p>
27   * @author Noel Winstanley nw@jb.man.ac.uk 24-Sep-2003
28   * @author mch
29   */
30  public class QuerierManager implements QuerierListener {
31     
32     
33     private static final Log log = LogFactory.getLog(QuerierManager.class);
34     
35     /*** Identifier for this manager/querier container */
36     private String managerId;
37     
38     /*** List of managers */
39     private static Hashtable managers = new Hashtable();
40     
41     /*** Lookup table of initialised queriers.  These are queriers that have been
42      * created but are yet 'complete', or have not yet been told to run.  For
43      * example, the CEA architecture has an 'init' which creates a query, and a
44      * 'execute' which will kick it to the queue
45      */
46     private Hashtable heldQueriers = new Hashtable();
47     
48     /*** lookup table of queued queriers.  These are queriers that are waiting on
49      a 'free' spot on the running queriers - ie when the running queriers have hit
50      * the maximum limit and the */
51     private Hashtable queuedQueriers = new Hashtable();
52     
53     /*** priority index of queued queriers */
54     private TreeSet queuedPriorities = new TreeSet(new QuerierStartTimeComparator());
55  
56     /*** lookup table of all the current queriers indexed by their handle*/
57     private Hashtable runningQueriers = new Hashtable();
58  
59     /*** lookup table of old queriers */
60     private Hashtable closedQueriers = new Hashtable();
61  
62     /*** Maximum number of simultaneous queriers allowed */
63     private int maxQueriers = 5;
64     
65     /*** Special ID used to create a test querier for testing getStatus,. etc */
66     public final static String TEST_QUERIER_ID = "TestQuerier:";
67     
68     /*** PriorityComparitor for the queue */
69     protected class QuerierStartTimeComparator implements Comparator {
70        
71        /***
72         * Compares its two arguments for order.  Returns a negative integer,
73         * zero, or a positive integer as the first argument is less than, equal
74         * to, or greater than the second.<p>
75         *
76         * @param o1 the first object to be compared.
77         * @param o2 the second object to be compared.
78         * @return a negative integer, zero, or a positive integer as the
79         *           first argument is less than, equal to, or greater than the
80         *        second.
81         * @throws ClassCastException if the arguments' types prevent them from
82         *           being compared by this Comparator.
83         */
84        public int compare(Object o1, Object o2) {
85           Querier q1 = (Querier) o1;
86           Querier q2 = (Querier) o2;
87           if (q1.getStatus().getStartTime().getTime()<q2.getStatus().getStartTime().getTime()) {
88              return -1;
89           }
90           else if (q1.getStatus().getStartTime().getTime()>q2.getStatus().getStartTime().getTime()) {
91              return 1;
92           }
93           else {
94              return 0;
95           }
96        }
97        
98     }
99     
100    /*** Status Comparitor for displays */
101    protected class StatusStartTimeComparator implements Comparator {
102       
103       /***
104        * Compares its two arguments for order.  Returns a negative integer,
105        * zero, or a positive integer as the first argument is less than, equal
106        * to, or greater than the second.<p>
107        *
108        * @param o1 the first object to be compared.
109        * @param o2 the second object to be compared.
110        * @return a negative integer, zero, or a positive integer as the
111        *           first argument is less than, equal to, or greater than the
112        *        second.
113        * @throws ClassCastException if the arguments' types prevent them from
114        *           being compared by this Comparator.
115        */
116       public int compare(Object o1, Object o2) {
117          QuerierStatus q1 = (QuerierStatus) o1;
118          QuerierStatus q2 = (QuerierStatus) o2;
119          if (q1.getStartTime().getTime()<q2.getStartTime().getTime()) {
120             return 1;
121          }
122          else if (q1.getStartTime().getTime()>q2.getStartTime().getTime()) {
123             return -1;
124          }
125          else {
126             return 0;
127          }
128       }
129       
130    }
131 
132    /*** Constructor. Protected because we want to force people to use the factory method   */
133    protected QuerierManager(String givenId) {
134       this.managerId = givenId;
135       
136       maxQueriers = ConfigFactory.getCommonConfig().getInt("datacenter.max.queries",4);
137       
138    }
139    
140    /*** Factory method - checks to see if the givenId already exists and returns that if so */
141    public synchronized static QuerierManager getManager(String givenId) {
142       if (managers.get(givenId) != null) {
143          return (QuerierManager) managers.get(givenId);
144       }
145       else {
146          QuerierManager manager = new QuerierManager(givenId);
147          managers.put(givenId, manager);
148          return manager;
149       }
150    }
151 
152    /*** Return the querier with the given id */
153    public Querier getQuerier(String qid) {
154 
155       Querier q = (Querier) runningQueriers.get(qid);
156       if (q != null) return q;
157       
158       q = (Querier) queuedQueriers.get(qid);
159       if (q != null) return q;
160 
161       q = (Querier) closedQueriers.get(qid);
162       return q;
163    }
164    
165    /*** Returns a list of all the queued (initialised but not started) queriers - including
166     * those that are on the active queue and those just initialised waiting on some external push
167     */
168    public QuerierStatus[] getQueued() {
169       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
170       Querier[] initialised = (Querier[]) heldQueriers.values().toArray(new Querier[] {} );
171       QuerierStatus[] statuses = new QuerierStatus[queued.length+initialised.length];
172       for (int i = 0; i < queued.length; i++) {
173          statuses[i] = queued[i].getStatus();
174       }
175       for (int j = 0; j < initialised.length; j++) {
176          statuses[j+queued.length] = initialised[j].getStatus();
177       }
178       return statuses;
179    }
180 
181    /*** Returns a list of all the currently running queriers
182     */
183    public QuerierStatus[] getRunning() {
184       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
185       QuerierStatus[] statuses = new QuerierStatus[running.length];
186       for (int i = 0; i < running.length; i++) {
187          statuses[i] = running[i].getStatus();
188       }
189       return statuses;
190    }
191    
192    /*** Returns a list of all the ids of the currently running queriers
193     */
194    public QuerierStatus[] getClosed() {
195       Querier[] closed = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
196       QuerierStatus[] statuses = new QuerierStatus[closed.length];
197       for (int i = 0; i < closed.length; i++) {
198          statuses[i] = closed[i].getStatus();
199       }
200       return statuses;
201    }
202    
203    /*** Returns the status's of all the queriers in date/time order */
204    public QuerierStatus[] getAllStatus() {
205       Querier[] queued = (Querier[]) queuedQueriers.values().toArray(new Querier[] {} );
206       Querier[] running = (Querier[]) runningQueriers.values().toArray(new Querier[] {} );
207       Querier[] ran = (Querier[]) closedQueriers.values().toArray(new Querier[] {} );
208       
209       TreeSet statuses = new TreeSet(new StatusStartTimeComparator());
210       for (int i = 0; i < queued.length; i++) {
211          statuses.add(queued[i].getStatus());
212       }
213       for (int i = 0; i < running.length; i++) {
214          statuses.add(running[i].getStatus());
215       }
216       for (int i = 0; i < ran.length; i++) {
217          statuses.add(ran[i].getStatus());
218       }
219       return (QuerierStatus[]) statuses.toArray(new QuerierStatus[] {} );
220    }
221    
222    /*** Adds the given querier to this manager, but leaves it in the initialised passive queue */
223    public void holdQuerier(Querier querier) {
224       heldQueriers.put(querier.getId(), querier);
225    }
226    
227    /***
228     * Adds the given querier to this manager, and starts it off on a new
229     * thread.  asynchronous.
230     */
231    public void submitQuerier(Querier querier)  {
232 
233       //see if it's in holding queue
234       if (heldQueriers.get(querier.getId()) != null) {
235          heldQueriers.remove(querier.getId());
236       }
237       
238       //assigns handle
239       if (runningQueriers.get(querier.getId()) != null) {
240          log.error( "Handle '" + querier.getId() + "' already in use");
241          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
242       }
243       querier.setStatus(new QuerierQueued(querier.getStatus()));
244       queuedQueriers.put(querier.getId(), querier);
245       queuedPriorities.add(querier);
246       querier.addListener(this);
247       
248       checkQueue();
249    }
250 
251    /***
252     * Adds the given querier to this manager, runs it, and returns the status;
253     * synchronous (blocking); not queued
254     */
255    public QuerierStatus askQuerier(Querier querier)  throws IOException {
256       
257       //assigns handle
258       if (runningQueriers.get(querier.getId()) != null) {
259          log.error( "Handle '" + querier.getId() + "' already in use");
260          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
261       }
262       runningQueriers.put(querier.getId(), querier);
263       querier.addListener(this);
264       querier.ask();
265       return querier.getStatus();
266    }
267 
268    /*** Adds the given querier to this manager, and asks the querier for the
269     * count (number of matches).  Synchronous, returning the number of matches
270     */
271    public long askCount(Querier querier)  throws IOException {
272       
273       //assigns handle
274       if (runningQueriers.get(querier.getId()) != null) {
275          log.error( "Handle '" + querier.getId() + "' already in use");
276          throw new IllegalArgumentException("Handle " + querier.getId() + "already in use");
277       }
278       runningQueriers.put(querier.getId(), querier);
279       querier.addListener(this);
280       return querier.askCount();
281    }
282    
283    
284    /*** A Querier manager must listen to it's queriers
285     */
286    public void queryStatusChanged(Querier querier) {
287 
288       //if it's changed to closed, then move to closed list
289       if (querier.getStatus().isFinished()) {
290          runningQueriers.remove(querier.getId()); //remove if it's in running
291          queuedQueriers.remove(querier.getId()); //remove if it's queued
292          closedQueriers.put(querier.getId(), querier); //make sure it's in closed
293          checkQueue(); //see if, if having removed it from running, we ought to start another
294       }
295    }
296    
297    /*** Checks the queue - if there are queued queriers and not too many
298     * running, moves a queued one and starts it
299     */
300    protected synchronized void checkQueue() {
301 
302       System.gc(); //encourage garbage collection
303 
304       //have a look at the memory; if it's 'low' then reduce to one query
305       //at a time
306 //      if (Runtime.getRuntime().freeMemory()<500000) {
307 //         maxQueriers = 1;
308 //      }
309 
310       while ((queuedQueriers.size()>0) &&
311                 ( (maxQueriers==-1) || (runningQueriers.size()<=maxQueriers))) {
312          
313          Querier first = (Querier) queuedPriorities.first();
314          queuedPriorities.remove(first);
315          queuedQueriers.remove(first.getId());
316          runningQueriers.put(first.getId(), first);
317          
318          Thread qth = new Thread(first);
319          qth.start();
320       }
321    }
322    
323    
324    /*** Shut down - abort all running queries */
325    public void shutDown() {
326       //so no new ones start while we shut down
327       queuedQueriers.clear();
328       queuedPriorities.clear();
329       
330       QuerierStatus[] running = getRunning();
331       for (int i = 0; i < running.length; i++) {
332          Querier q = getQuerier(running[i].getId());
333          try {
334             q.abort();
335          }
336          catch (Throwable th) {
337             //ignore
338          }
339       }
340    }
341    
342    
343    
344    
345 }
346 
347 /*
348  $Log: QuerierManager.java,v $
349  Revision 1.2  2005/05/27 16:21:02  clq2
350  mchv_1
351 
352  Revision 1.1.1.1.24.1  2005/05/13 16:56:29  mch
353  'some changes'
354 
355  Revision 1.1.1.1  2005/02/17 18:37:35  mch
356  Initial checkin
357 
358  Revision 1.1.1.1  2005/02/16 17:11:24  mch
359  Initial checkin
360 
361  Revision 1.13.6.2  2004/11/25 18:33:43  mch
362  more status (incl persisting) more tablewriting lots of fixes
363 
364  Revision 1.13.6.1  2004/11/22 00:57:16  mch
365  New interfaces for SIAP etc and new slinger package
366 
367  Revision 1.13  2004/11/10 22:01:50  mch
368  skynode starts and some fixes
369 
370  Revision 1.12  2004/11/09 18:27:21  mch
371  added askCount
372 
373  Revision 1.11  2004/11/08 02:59:45  mch
374  Added held queriers
375 
376  Revision 1.10  2004/11/03 00:17:56  mch
377  PAL_MCH Candidate 2 merge
378 
379  Revision 1.6.10.2  2004/10/27 00:43:39  mch
380  Started adding getCount, some resource fixes, some jsps
381 
382  Revision 1.6.10.1  2004/10/20 19:42:03  mch
383  Added context listener and initalisation code
384 
385  Revision 1.6  2004/10/05 19:20:32  mch
386  Queuing order
387 
388  Revision 1.5  2004/10/05 17:31:14  mch
389  Fix to wrong class cast in comparator
390 
391  Revision 1.4  2004/10/05 15:20:03  mch
392  Added starttime sort to getStatus
393 
394  Revision 1.3  2004/10/05 14:57:10  mch
395  Added queued
396 
397  Revision 1.2  2004/10/01 18:04:58  mch
398  Some factoring out of status stuff, added monitor page
399 
400  Revision 1.1  2004/09/28 15:02:13  mch
401  Merged PAL and server packages
402 
403  Revision 1.25  2004/09/28 11:45:21  mch
404  Removed thread pooling :-)
405 
406  Revision 1.24  2004/09/17 01:26:12  nw
407  altered querier manager to use a threadpool
408 
409  Revision 1.23  2004/03/15 19:16:12  mch
410  Lots of fixes to status updates
411 
412  Revision 1.22  2004/03/15 17:50:57  mch
413  Added 'closed' querier queue and more published status information
414 
415  Revision 1.21  2004/03/13 23:38:46  mch
416  Test fixes and better front-end JSP access
417 
418  Revision 1.20  2004/03/12 04:45:26  mch
419  It05 MCH Refactor
420 
421  Revision 1.19  2004/03/08 15:57:42  mch
422  Fixes to ensure old ADQL interface works alongside new one and with old plugins
423 
424  Revision 1.18  2004/03/08 00:31:28  mch
425  Split out webservice implementations for versioning
426 
427  Revision 1.17  2004/03/07 00:33:50  mch
428  Started to separate It4.1 interface from general server services
429 
430  Revision 1.16  2004/02/24 19:13:47  mch
431  Added logging info trace
432 
433  Revision 1.15  2004/02/24 16:04:18  mch
434  Config refactoring and moved datacenter It04.1 VoSpaceStuff to myspace StoreStuff
435 
436  Revision 1.14  2004/02/17 03:38:05  mch
437  Various fixes for demo
438 
439  Revision 1.13  2004/02/16 23:34:35  mch
440  Changed to use Principal and AttomConfig
441 
442  Revision 1.12  2004/01/15 17:38:25  nw
443  adjusted how queriers close() themselves - altered so it
444  works no matter if Querier.close() or QuerierManager.closeQuerier(q)
445  is called.
446 
447  Revision 1.11  2004/01/14 17:57:32  nw
448  improved documentation
449 
450  Revision 1.10  2004/01/13 00:33:14  nw
451  Merged in branch providing
452  * sql pass-through
453  * replace Certification by User
454  * Rename _query as Query
455 
456  Revision 1.9.6.2  2004/01/08 09:43:41  nw
457  replaced adql front end with a generalized front end that accepts
458  a range of query languages (pass-thru sql at the moment)
459 
460  Revision 1.9.6.1  2004/01/07 11:51:07  nw
461  found out how to get wsdl to generate nice java class names.
462  Replaced _query with Query throughout sources.
463 
464  Revision 1.9  2003/12/03 19:37:03  mch
465  Introduced DirectDelegate, fixed DummyQuerier
466 
467  Revision 1.8  2003/12/03 12:47:44  mch
468  Better error reportiong for failed Querier instantiations
469 
470  Revision 1.7  2003/12/01 20:57:39  mch
471  Abstracting coarse-grained plugin
472 
473  Revision 1.6  2003/12/01 16:43:52  nw
474  dropped QueryId, back to string
475 
476  Revision 1.5  2003/11/28 16:10:30  nw
477  finished plugin-rewrite.
478  added tests to cover plugin system.
479  cleaned up querier & queriermanager. tested
480 
481  Revision 1.4  2003/11/27 17:28:09  nw
482  finished plugin-refactoring
483 
484  Revision 1.3  2003/11/27 00:52:58  nw
485  refactored to introduce plugin-back end and translator maps.
486  interfaces in place. still broken code in places.
487 
488  Revision 1.2  2003/11/25 18:50:06  mch
489  Abstracted Querier from DatabaseQuerier
490 
491  Revision 1.1  2003/11/25 14:17:24  mch
492  Extracting Querier from DatabaseQuerier to handle non-database backends
493 
494  Revision 1.5  2003/11/25 11:57:56  mch
495  Added framework for SQL-passthrough queries
496 
497  Revision 1.4  2003/11/21 17:37:56  nw
498  made a start tidying up the server.
499  reduced the number of failing tests
500  found commented out code
501 
502  Revision 1.3  2003/11/18 11:10:16  mch
503  Removed client dependencies on server
504 
505  Revision 1.2  2003/11/17 15:41:48  mch
506  Package movements
507 
508  Revision 1.1  2003/11/14 00:38:29  mch
509  Code restructure
510 
511  Revision 1.5  2003/11/05 18:57:26  mch
512  Build fixes for change to SOAPy Beans and new delegates
513 
514  Revision 1.4  2003/10/06 18:56:27  mch
515  Naughtily large set of changes converting to SOAPy bean/interface-based delegates
516 
517  Revision 1.3  2003/09/26 11:38:00  nw
518  improved documentation, fixed imports
519 
520  Revision 1.2  2003/09/25 01:23:28  nw
521  altered visibility on generateHandle() so it can be used within DummyQuerier
522 
523  Revision 1.1  2003/09/24 21:02:45  nw
524  Factored creation and management of database queriers
525  into separate class. Simplifies DatabaseQuerier.
526 
527  Database Querier - added calls to timer, untagled status transitions
528  
529  */
530 
531 
532