1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.impl.workflow;
12
13 import org.astrogrid.community.beans.v1.Account;
14 import org.astrogrid.component.descriptor.ComponentDescriptor;
15 import org.astrogrid.jes.job.JobException;
16 import org.astrogrid.jes.job.NotFoundException;
17 import org.astrogrid.workflow.beans.v1.Workflow;
18 import org.astrogrid.workflow.beans.v1.execution.JobURN;
19
20 import org.exolab.castor.xml.CastorException;
21
22 import java.io.IOException;
23 import java.io.StringWriter;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.ResultSet;
27 import java.sql.SQLException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Iterator;
31
32 import javax.sql.DataSource;
33
34 import junit.framework.Test;
35 import junit.framework.TestCase;
36 import junit.framework.TestSuite;
37
38 /*** Implememntation of JobFacotry, where jobs are stored in a database.
39 * @author Noel Winstanley nw@jb.man.ac.uk 12-Feb-2004
40 *
41 */
42 public class DBJobFactoryImpl extends AbstractJobFactoryImpl implements ComponentDescriptor{
43
44
45
46
47 /*** Construct a database-bascked job factory
48 *
49 * @param ds datasource to connect to db
50 * @param sql sql commands object to use to interact with db.
51 */
52 public DBJobFactoryImpl(DataSource ds,SqlCommands sql) {
53 super();
54 log.info("Database Job Factory");
55 this.datasource = ds;
56 this.sql = sql;
57 presenceCheck = new PresenceCheck();
58 }
59
60
61
62 protected final DataSource datasource;
63 protected final SqlCommands sql;
64 protected final PresenceCheck presenceCheck ;
65
66
67
68 /***
69 * @see org.astrogrid.jes.job.JobFactory#createJob(org.astrogrid.jes.job.SubmitJobRequest)
70 */
71 public Workflow initializeJob(Workflow req) throws JobException {
72 final Workflow j = super.buildJob(req);
73 (new DBHelper(sql.getInsertSQL()) {
74
75 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, IOException {
76 ps.setString(1,id(j));
77 Account acc = j.getCredentials().getAccount();
78 ps.setString(2,acc.getName());
79 ps.setString(3,acc.getCommunity());
80 StringWriter sw = new StringWriter();
81 j.marshal(sw);
82 sw.close();
83 ps.setString(4,sw.toString());
84 ps.executeUpdate();
85 return null;
86 }
87 }).run();
88 return j;
89 }
90 /***
91 * @see org.astrogrid.jes.job.JobFactory#findJob(java.lang.String)
92 */
93 public Workflow findJob(final JobURN urn) throws JobException {
94
95 return (Workflow)(new DBHelper(sql.getRetrieveSQL()) {
96 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, NotFoundException {
97 ps.setString(1,urn.getContent());
98 ResultSet rs = ps.executeQuery();
99 if (rs.next()) {
100 return Workflow.unmarshalWorkflow(rs.getCharacterStream(1));
101 } else {
102 throw new NotFoundException("Could not find job " + urn);
103 }
104 }
105 }).run();
106 }
107 /***
108 * @see org.astrogrid.jes.job.JobFactory#findUserJobs(java.lang.String, java.lang.String, java.lang.String)
109 */
110 public Iterator findUserJobs(final Account acc) throws JobException {
111 final Collection c = new ArrayList();
112 (new DBHelper(sql.getListSQL()) {
113
114 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, IOException {
115 ps.setString(1, acc.getName());
116 ps.setString(2,acc.getCommunity());
117 ResultSet rs = ps.executeQuery();
118 while (rs.next()) {
119 Workflow wf = Workflow.unmarshalWorkflow(rs.getCharacterStream(1));
120 c.add(wf);
121 }
122 return null;
123 }
124 }).run();
125 return c.iterator();
126 }
127 /***
128 * @see org.astrogrid.jes.job.JobFactory#deleteJob(org.astrogrid.jes.job.Job)
129 */
130 public void deleteJob(final Workflow job) throws JobException {
131 presenceCheck.check(job);
132 (new DBHelper(sql.getDeleteSQL()) {
133
134 Object dbMethod(PreparedStatement ps) throws SQLException {
135 ps.setString(1,id(job));
136 ps.execute();
137 return null;
138 }
139 }).run();
140 }
141 /***
142 * @see org.astrogrid.jes.job.JobFactory#updateJob(org.astrogrid.jes.job.Job)
143 */
144 public void updateJob(final Workflow j) throws JobException {
145 presenceCheck.check(j);
146 (new DBHelper(sql.getUpdateSQL()) {
147
148 Object dbMethod(PreparedStatement ps) throws SQLException , CastorException, IOException{
149 Account acc = j.getCredentials().getAccount();
150 ps.setString(1,acc.getName());
151 ps.setString(2,acc.getCommunity());
152 StringWriter sw = new StringWriter();
153 j.marshal(sw);
154 sw.close();
155 ps.setString(3,sw.toString());
156 ps.setString(4,id(j));
157 ps.execute();
158 return null;
159 }
160 }).run();
161
162 }
163 /*** application of db helper -verifies a particular job is present in the db */
164 private class PresenceCheck extends DBHelper {
165 public PresenceCheck() {
166 super( sql.getRetrieveSQL());
167 }
168 protected String jobURN;
169 Object dbMethod(PreparedStatement ps) throws SQLException, JobException {
170 ps.setString(1,jobURN);
171 ResultSet rs = ps.executeQuery();
172 if (!rs.next()) {
173 throw new NotFoundException("job urn " + jobURN + " not found");
174 }
175 return null;
176 }
177 public void check(Workflow wf) throws JobException {
178 this.jobURN = id(wf);
179 this.run();
180 }
181 }
182 /*** helper class - abstract away all the mucky business of jdbc, and ensures everything is deallocated cleanly. */
183 abstract class DBHelper{
184 public DBHelper(String sql) {
185 this.sql = sql;
186 }
187 private final String sql;
188 abstract Object dbMethod(PreparedStatement ps) throws JobException,SQLException, CastorException, IOException;
189 public final Object run() throws JobException {
190 Connection conn = null;
191 PreparedStatement stmnt = null;
192 try {
193 conn = datasource.getConnection();
194 stmnt = conn.prepareStatement(sql);
195 return this.dbMethod(stmnt);
196 } catch (SQLException e) {
197 log.error("Database Exception",e);
198 throw new JobException("Store Failure",e);
199 } catch(CastorException e) {
200 log.error("Marshalling Exception",e);
201 throw new JobException("Marshalling Failure",e);
202 } catch (IOException e) {
203 log.error("IOExcepiton",e);
204 throw new JobException("Marshalling Failure",e);
205 } finally {
206 if (stmnt != null) {
207 try {
208 stmnt.close();
209 } catch (SQLException e) {
210 log.warn("SQL exception on close statement",e);
211 }
212 }
213 if (conn != null){
214 try {
215 conn.close();
216 } catch (SQLException e) {
217 log.warn("SQL exception on close connection",e);
218 }
219 }
220 }
221 }
222 }
223 /***
224 * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
225 */
226 public String getName() {
227 return "database-backed job factory";
228 }
229
230
231
232 /***
233 * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
234 */
235 public String getDescription() {
236 return "persists job records in a database table";
237 }
238
239
240
241 /***
242 * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
243 */
244 public Test getInstallationTest() {
245 TestSuite suite = new TestSuite("Tests for DB Job Factory");
246 suite.addTestSuite(InstallationTest.class);
247 return suite;
248 }
249 /*** verify that we can see database trhough datasource */
250 private class InstallationTest extends TestCase {
251
252 public InstallationTest(String s) {
253 super(s);
254 }
255
256 public void testDatasource() throws Exception {
257 Connection conn = datasource.getConnection();
258 assertNotNull(conn);
259 conn.close();
260 }
261
262 public void testSQL() throws Exception {
263 Connection conn = datasource.getConnection();
264 PreparedStatement stmnt = conn.prepareStatement(sql.getDeleteSQL());
265 stmnt.close();
266 stmnt = conn.prepareStatement(sql.getInsertSQL());
267 stmnt.close();
268 stmnt = conn.prepareStatement(sql.getListSQL());
269 stmnt.close();
270 stmnt = conn.prepareStatement(sql.getRetrieveSQL());
271 stmnt.close();
272 stmnt = conn.prepareStatement(sql.getUpdateSQL());
273 stmnt.close();
274 }
275
276
277 }
278 }
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332