1
2
3
4
5
6
7
8
9
10
11 package org.astrogrid.jes.impl.workflow;
12
13 import org.astrogrid.common.namegen.FileNameGen;
14 import org.astrogrid.community.beans.v1.Account;
15 import org.astrogrid.component.descriptor.ComponentDescriptor;
16 import org.astrogrid.jes.job.JobException;
17 import org.astrogrid.jes.job.NotFoundException;
18 import org.astrogrid.jes.util.BaseDirectory;
19 import org.astrogrid.workflow.beans.v1.Workflow;
20 import org.astrogrid.workflow.beans.v1.execution.JobURN;
21
22 import org.apache.commons.collections.IteratorUtils;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.commons.transaction.file.ResourceManagerException;
26 import org.exolab.castor.xml.CastorException;
27
28 import java.io.BufferedReader;
29 import java.io.File;
30 import java.io.FileReader;
31 import java.io.FileWriter;
32 import java.io.FilenameFilter;
33 import java.io.IOException;
34 import java.io.PrintWriter;
35 import java.net.URLEncoder;
36 import java.util.Iterator;
37
38 import junit.framework.Test;
39 import junit.framework.TestCase;
40 import junit.framework.TestSuite;
41
42 /*** Implementation of JobFactory that stores xml documents on a filesystem.
43 * @author Noel Winstanley nw@jb.man.ac.uk 11-Feb-2004
44 * @modified nww - looked at ways to optimize this class.
45 *
46 */
47 public abstract class FileJobFactoryImpl extends AbstractJobFactoryImpl implements ComponentDescriptor{
48 /***
49 * Commons Logger for this class
50 */
51 private static final Log logger = LogFactory
52 .getLog(FileJobFactoryImpl.class);
53
54 private static final String WORKFLOW_SUFFIX = "-workflow.xml";
55 /*** Construct a new FileJobFactoryImpl
56 * Construct a new FileJobFactoryImpl
57 * @param baseDir directory to store workflow documents in
58 * @throws IOException if basedir inaccessible.
59 * @throws ResourceManagerException
60 * @throws IllegalArgumentException
61 */
62 public FileJobFactoryImpl(BaseDirectory bd) throws IOException,
63 IllegalArgumentException,
64 ResourceManagerException{
65 super(new FileNameGen(bd.getDir(),"jes"));
66 log.info("File Store Job Factory");
67 this.baseDir = bd.getDir();
68 assert baseDir != null;
69 log.info("Base directory of file store:" + baseDir.getAbsolutePath());
70 initStore();
71 }
72
73 protected final File baseDir;
74 /*** initialize the store directory */
75 protected void initStore() throws IOException {
76
77 if (! baseDir.exists()) {
78 log.info("Initializing file store");
79 baseDir.mkdirs();
80 }
81 assert baseDir.isDirectory();
82 assert baseDir.canRead();
83 assert baseDir.canWrite();
84 }
85
86 protected final File mkOutputFile(Workflow j) {
87 return mkOutputFile(j.getJobExecutionRecord().getJobId());
88 }
89
90 protected final File mkOutputFile(JobURN jobURN) {
91 return new File(baseDir,URLEncoder.encode(jobURN.getContent() + WORKFLOW_SUFFIX));
92 }
93
94 /***
95 * @see org.astrogrid.jes.job.JobFactory#createJob(org.astrogrid.jes.job.SubmitJobRequest)
96 */
97 public Workflow initializeJob(Workflow req) throws JobException {
98 Workflow j = buildJob(req);
99 FileWriter fw = null;
100 try {
101 File outFile = mkOutputFile(j);
102 fw = new FileWriter(outFile);
103 j.marshal(fw);
104 } catch (Exception e) {
105 throw new JobException("Problem with store",e);
106 } finally {
107 if (fw != null) {
108 try {
109 fw.close();
110 } catch (IOException ioe) {
111 log.error("failed to close filestore");
112 }
113 }
114 }
115 return j;
116 }
117
118 /***
119 * @see org.astrogrid.jes.job.JobFactory#findJob(java.lang.String)
120 */
121 public Workflow findJob(JobURN jobURN) throws JobException {
122 FileReader fr = null;
123 try {
124 File f = mkOutputFile(jobURN);
125 if (! f.exists()) {
126 throw new NotFoundException("Couldn't find job " + jobURN);
127 }
128 fr = new FileReader(f);
129 return Workflow.unmarshalWorkflow(fr);
130 } catch (CastorException e) {
131 throw new JobException("Problem with creating object model",e);
132 } catch (IOException e) {
133 throw new JobException("Problem with reading xml from store",e);
134 } finally {
135 if (fr != null) {
136 try {
137 fr.close();
138 } catch (IOException ioe) {
139 log.error("failed to close file");
140 }
141 }
142 }
143 }
144
145 protected static final String SERVER_PREFIX = "jes:" + hostname + "/" ;
146
147 protected final String mkPrefix(Account acc) {
148 StringBuffer buff = new StringBuffer()
149 .append(SERVER_PREFIX)
150 .append(acc.getName())
151 .append("@")
152 .append(acc.getCommunity())
153 .append("/");
154 return buff.toString();
155 }
156 /***
157 * @see org.astrogrid.jes.job.JobFactory#findUserJobs(java.lang.String, java.lang.String, java.lang.String)
158 */
159 public Iterator findUserJobs(final Account acc) throws JobException {
160
161 File[] jobFiles = baseDir.listFiles(new FilenameFilter() {
162 final String searchString = URLEncoder.encode(mkPrefix(acc));
163 public boolean accept(File dir, String name) {
164 return name.startsWith(searchString);
165 }
166 });
167 final Iterator i = IteratorUtils.arrayIterator(jobFiles);
168 return new Iterator() {
169
170 public void remove() {
171 throw new UnsupportedOperationException("don't remove");
172 }
173
174 public boolean hasNext() {
175 return i.hasNext();
176 }
177 /*** need to make this more resiliant to changes on disk - it's possible that queued changes will be processed, and so the files
178 * won't be there when they're come to being read. If this happens now, it skips onto the next item.
179 * 'Course this means that we need to handle a null at the end of the list. which is a pity.
180 * @see java.util.Iterator#next()
181 */
182 public Object next() {
183 if (!i.hasNext()) {
184 logger.warn("Reached unexpected end of iterator");
185 return null;
186 }
187 File f = (File)i.next();
188 try {
189 if (! f.exists()) {
190 logger.info("Skipping non-existent file " + f);
191 return this.next();
192 }
193 return Workflow.unmarshalWorkflow(new FileReader(f));
194 } catch (Exception e) {
195 logger.warn("Failed to unmarshal this file " + f + " skipping");
196 return this.next();
197 }
198 }
199 };
200
201 }
202 /***
203 * @see org.astrogrid.jes.job.JobFactory#deleteJob(org.astrogrid.jes.job.Job)
204 */
205 public void deleteJob(Workflow job) throws JobException {
206 File f = mkOutputFile(job);
207 if (f.exists()) {
208 f.delete();
209 } else {
210 throw new NotFoundException("Job URN " + id(job) + " not found");
211 }
212 }
213 /***
214 * @see org.astrogrid.jes.job.JobFactory#updateJob(org.astrogrid.jes.job.Job)
215 */
216 public void updateJob(Workflow j) throws JobException {
217 FileWriter fw = null;
218 try {
219 File outFile = mkOutputFile(j);
220 if (! outFile.exists()) {
221 throw new NotFoundException("Job URN " + id(j) + " not found");
222 }
223 fw = new FileWriter(outFile);
224 j.marshal(fw);
225 } catch (Exception e) {
226 throw new JobException("Problem with store",e);
227 } finally {
228 if (fw != null) {
229 try {
230 fw.close();
231 } catch (IOException e) {
232 log.error("Could not close file writer");
233 }
234 }
235 }
236 }
237
238 /***
239 * @see org.astrogrid.jes.component.ComponentDescriptor#getName()
240 */
241 public String getName() {
242 return "Filestore-backed job factory";
243 }
244
245 /***
246 * @see org.astrogrid.jes.component.ComponentDescriptor#getDescription()
247 */
248 public String getDescription() {
249 return "Stores jobs on disk as workflow xml document files\n"
250 + "store directory: " + baseDir.getAbsolutePath();
251 }
252
253
254 /***
255 * @see org.astrogrid.jes.component.ComponentDescriptor#getInstallationTest()
256 */
257 public Test getInstallationTest() {
258 TestSuite suite = new TestSuite("Tests for File Job Factory Imp");
259 suite.addTest(new InstallationTest("testBaseDirExists"));
260 suite.addTest(new InstallationTest("testWriteFile"));
261 suite.addTest(new InstallationTest("testReadFile"));
262 return suite;
263 }
264
265
266 protected class InstallationTest extends TestCase {
267
268
269 public InstallationTest(String s) {
270 super(s);
271 }
272
273 public void testBaseDirExists() {
274 assertTrue("base dir does not exist",baseDir.exists());
275 assertTrue("base dir is not a directory",baseDir.isDirectory());
276 assertTrue("base dir is not readable and writable",baseDir.canRead() && baseDir.canWrite());
277 }
278 final File testFile = new File(baseDir,"test-file");
279 private final static String CONTENTS = "test file contents";
280 public void testWriteFile() throws IOException {
281 PrintWriter pw = new PrintWriter( new FileWriter(testFile));
282 pw.println(CONTENTS);
283 pw.close();
284 assertTrue(testFile.exists());
285 }
286 public void testReadFile() throws IOException {
287 BufferedReader reader = new BufferedReader( new FileReader(testFile));
288 String line = reader.readLine();
289 assertEquals("does not match expected",CONTENTS,line);
290 }
291 }
292
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386