1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.impl.workflow;
12
13 import org.astrogrid.common.namegen.NameGen;
14 import org.astrogrid.community.beans.v1.Account;
15 import org.astrogrid.component.descriptor.ComponentDescriptor;
16 import org.astrogrid.jes.job.JobException;
17 import org.astrogrid.jes.job.NotFoundException;
18 import org.astrogrid.workflow.beans.v1.Workflow;
19 import org.astrogrid.workflow.beans.v1.execution.JobURN;
20
21 import org.exolab.castor.xml.CastorException;
22
23 import java.io.IOException;
24 import java.io.StringWriter;
25 import java.sql.Connection;
26 import java.sql.PreparedStatement;
27 import java.sql.ResultSet;
28 import java.sql.SQLException;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Iterator;
32
33 import javax.sql.DataSource;
34
35 import junit.framework.Test;
36 import junit.framework.TestCase;
37 import junit.framework.TestSuite;
38
39 /*** Implememntation of JobFacotry, where jobs are stored in a database.
40 * @author Noel Winstanley nw@jb.man.ac.uk 12-Feb-2004
41 *
42 */
43 public class DBJobFactoryImpl extends AbstractJobFactoryImpl implements ComponentDescriptor{
44
45
46
47
48 /*** Construct a database-bascked job factory
49 *
50 * @param ds datasource to connect to db
51 * @param sql sql commands object to use to interact with db.
52 */
53 public DBJobFactoryImpl(NameGen nameGen,DataSource ds,SqlCommands sql) {
54 super(nameGen);
55 log.info("Database Job Factory");
56 this.datasource = ds;
57 this.sql = sql;
58 presenceCheck = new PresenceCheck();
59 }
60
61
62
63 protected final DataSource datasource;
64 protected final SqlCommands sql;
65 protected final PresenceCheck presenceCheck ;
66
67
68
69 /***
70 * @see org.astrogrid.jes.job.JobFactory#createJob(org.astrogrid.jes.job.SubmitJobRequest)
71 */
72 public Workflow initializeJob(Workflow req) throws JobException {
73 final Workflow j = super.buildJob(req);
74 (new DBHelper(sql.getInsertSQL()) {
75
76 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, IOException {
77 ps.setString(1,id(j));
78 Account acc = j.getCredentials().getAccount();
79 ps.setString(2,acc.getName());
80 ps.setString(3,acc.getCommunity());
81 StringWriter sw = new StringWriter();
82 j.marshal(sw);
83 sw.close();
84 ps.setString(4,sw.toString());
85 ps.executeUpdate();
86 return null;
87 }
88 }).run();
89 return j;
90 }
91 /***
92 * @see org.astrogrid.jes.job.JobFactory#findJob(java.lang.String)
93 */
94 public Workflow findJob(final JobURN urn) throws JobException {
95
96 return (Workflow)(new DBHelper(sql.getRetrieveSQL()) {
97 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, NotFoundException {
98 ps.setString(1,urn.getContent());
99 ResultSet rs = ps.executeQuery();
100 if (rs.next()) {
101 return Workflow.unmarshalWorkflow(rs.getCharacterStream(1));
102 } else {
103 throw new NotFoundException("Could not find job " + urn);
104 }
105 }
106 }).run();
107 }
108 /***
109 * @see org.astrogrid.jes.job.JobFactory#findUserJobs(java.lang.String, java.lang.String, java.lang.String)
110 */
111 public Iterator findUserJobs(final Account acc) throws JobException {
112 final Collection c = new ArrayList();
113 (new DBHelper(sql.getListSQL()) {
114
115 Object dbMethod(PreparedStatement ps) throws SQLException, CastorException, IOException {
116 ps.setString(1, acc.getName());
117 ps.setString(2,acc.getCommunity());
118 ResultSet rs = ps.executeQuery();
119 while (rs.next()) {
120 Workflow wf = Workflow.unmarshalWorkflow(rs.getCharacterStream(1));
121 c.add(wf);
122 }
123 return null;
124 }
125 }).run();
126 return c.iterator();
127 }
128 /***
129 * @see org.astrogrid.jes.job.JobFactory#deleteJob(org.astrogrid.jes.job.Job)
130 */
131 public void deleteJob(final Workflow job) throws JobException {
132 presenceCheck.check(job);
133 (new DBHelper(sql.getDeleteSQL()) {
134
135 Object dbMethod(PreparedStatement ps) throws SQLException {
136 ps.setString(1,id(job));
137 ps.execute();
138 return null;
139 }
140 }).run();
141 }
142 /***
143 * @see org.astrogrid.jes.job.JobFactory#updateJob(org.astrogrid.jes.job.Job)
144 */
145 public void updateJob(final Workflow j) throws JobException {
146 presenceCheck.check(j);
147 (new DBHelper(sql.getUpdateSQL()) {
148
149 Object dbMethod(PreparedStatement ps) throws SQLException , CastorException, IOException{
150 Account acc = j.getCredentials().getAccount();
151 ps.setString(1,acc.getName());
152 ps.setString(2,acc.getCommunity());
153 StringWriter sw = new StringWriter();
154 j.marshal(sw);
155 sw.close();
156 ps.setString(3,sw.toString());
157 ps.setString(4,id(j));
158 ps.execute();
159 return null;
160 }
161 }).run();
162
163 }
164 /*** application of db helper -verifies a particular job is present in the db */
165 private class PresenceCheck extends DBHelper {
166 public PresenceCheck() {
167 super( sql.getRetrieveSQL());
168 }
169 protected String jobURN;
170 Object dbMethod(PreparedStatement ps) throws SQLException, JobException {
171 ps.setString(1,jobURN);
172 ResultSet rs = ps.executeQuery();
173 if (!rs.next()) {
174 throw new NotFoundException("job urn " + jobURN + " not found");
175 }
176 return null;
177 }
178 public void check(Workflow wf) throws JobException {
179 this.jobURN = id(wf);
180 this.run();
181 }
182 }
183 /*** helper class - abstract away all the mucky business of jdbc, and ensures everything is deallocated cleanly. */
184 abstract class DBHelper{
185 public DBHelper(String sql) {
186 this.sql = sql;
187 }
188 private final String sql;
189 abstract Object dbMethod(PreparedStatement ps) throws JobException,SQLException, CastorException, IOException;
190 public final Object run() throws JobException {
191 Connection conn = null;
192 PreparedStatement stmnt = null;
193 try {
194 conn = datasource.getConnection();
195 stmnt = conn.prepareStatement(sql);
196 return this.dbMethod(stmnt);
197 } catch (SQLException e) {
198 log.error("Database Exception",e);
199 throw new JobException("Store Failure",e);
200 } catch(CastorException e) {
201 log.error("Marshalling Exception",e);
202 throw new JobException("Marshalling Failure",e);
203 } catch (IOException e) {
204 log.error("IOExcepiton",e);
205 throw new JobException("Marshalling Failure",e);
206 } finally {
207 if (stmnt != null) {
208 try {
209 stmnt.close();
210 } catch (SQLException e) {
211 log.warn("SQL exception on close statement",e);
212 }
213 }
214 if (conn != null){
215 try {
216 conn.close();
217 } catch (SQLException e) {
218 log.warn("SQL exception on close connection",e);
219 }
220 }
221 }
222 }
223 }
224 /***
225 * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
226 */
227 public String getName() {
228 return "database-backed job factory";
229 }
230
231
232
233 /***
234 * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
235 */
236 public String getDescription() {
237 return "persists job records in a database table";
238 }
239
240
241
242 /***
243 * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
244 */
245 public Test getInstallationTest() {
246 TestSuite suite = new TestSuite("Tests for DB Job Factory");
247 suite.addTestSuite(InstallationTest.class);
248 return suite;
249 }
250 /*** verify that we can see database trhough datasource */
251 private class InstallationTest extends TestCase {
252
253 public InstallationTest(String s) {
254 super(s);
255 }
256
257 public void testDatasource() throws Exception {
258 Connection conn = datasource.getConnection();
259 assertNotNull(conn);
260 conn.close();
261 }
262
263 public void testSQL() throws Exception {
264 Connection conn = datasource.getConnection();
265 PreparedStatement stmnt = conn.prepareStatement(sql.getDeleteSQL());
266 stmnt.close();
267 stmnt = conn.prepareStatement(sql.getInsertSQL());
268 stmnt.close();
269 stmnt = conn.prepareStatement(sql.getListSQL());
270 stmnt.close();
271 stmnt = conn.prepareStatement(sql.getRetrieveSQL());
272 stmnt.close();
273 stmnt = conn.prepareStatement(sql.getUpdateSQL());
274 stmnt.close();
275 }
276
277
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339