python sdk JobTask convenience methods
authorTom Clegg <tom@clinicalfuture.com>
Sat, 11 May 2013 06:09:26 +0000 (23:09 -0700)
committerTom Clegg <tom@clinicalfuture.com>
Sat, 11 May 2013 06:37:20 +0000 (23:37 -0700)
.gitignore
sdk/python/arvados.py

index b25c15b81fae06e1c55946ac6270bfdb293870e8..2f836aacf266b3e7fc17ec2d8a99aa68c904c1b9 100644 (file)
@@ -1 +1,2 @@
 *~
+*.pyc
index bbe0f25e78b6343aa506017284081603c79fb308..df4ef6be31e9864935c1ed922f1354fa8854368c 100644 (file)
@@ -5,6 +5,10 @@ import os
 import pprint
 import sys
 import types
+import subprocess
+import json
+import UserDict
+import re
 
 from apiclient import errors
 from apiclient.discovery import build
@@ -29,12 +33,32 @@ http = credentials.authorize(http)
 http.disable_ssl_certificate_validation=True
 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
 
+def task_set_output(self,s):
+    service.job_tasks().update(uuid=self['uuid'],
+                               job_task=json.dumps({
+                'output':s,
+                'success':True,
+                'progress':1.0
+                })).execute()
+
+_current_task = None
 def current_task():
+    global _current_task
+    if _current_task:
+        return _current_task
     t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+    t = UserDict.UserDict(t)
+    t.set_output = types.MethodType(task_set_output, t)
+    _current_task = t
     return t
 
+_current_job = None
 def current_job():
+    global _current_job
+    if _current_job:
+        return _current_job
     t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
+    _current_job = t
     return t
 
 class JobTask:
@@ -44,24 +68,48 @@ class JobTask:
 class job_setup:
     @staticmethod
     def one_task_per_input_file(if_sequence=0, and_end_task=True):
-        if if_sequence != current_job()['sequence']:
+        if if_sequence != current_task()['sequence']:
             return
-        job_input = current_job()['parameters']['input']
+        job_input = current_job()['script_parameters']['input']
         p = subprocess.Popen(["whls", job_input],
                              stdout=subprocess.PIPE,
                              stdin=None, stderr=None,
                              shell=False, close_fds=True)
         for f in p.stdout.read().split("\n"):
             if f != '':
-                task_input = job_input + '/' + f
+                task_input = job_input + '/' + re.sub(r'^\./', '', f)
                 new_task_attrs = {
                     'job_uuid': current_job()['uuid'],
+                    'sequence': if_sequence + 1,
                     'parameters': {
                         'input':task_input
                         }
                     }
-                service.jobs_tasks().create(job_task=new_task_attrs)
+                service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
         if and_end_task:
             service.job_tasks().update(uuid=current_task()['uuid'],
-                                       job_task={'success':True})
-            exit 0
+                                       job_task=json.dumps({'success':True}))
+            exit(0)
+
+class DataReader:
+    def __init__(self, data_locator):
+        self.data_locator = data_locator
+        self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
+                                  stdout=subprocess.PIPE,
+                                  stdin=None, stderr=subprocess.PIPE,
+                                  shell=False, close_fds=True)
+    def __enter__(self):
+        pass
+    def __exit__(self):
+        self.close()
+    def read(self, size, **kwargs):
+        return self.p.stdout.read(size, **kwargs)
+    def close(self):
+        self.p.stdout.close()
+        if not self.p.stderr.closed:
+            for err in self.p.stderr:
+                print >> sys.stderr, err
+            self.p.stderr.close()
+        self.p.wait()
+        if self.p.returncode != 0:
+            raise Exception("subprocess exited %d" % self.p.returncode)