import pprint
import sys
import types
+import subprocess
+import json
+import UserDict
+import re
from apiclient import errors
from apiclient.discovery import build
http.disable_ssl_certificate_validation=True
service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
+def task_set_output(self,s):
+ service.job_tasks().update(uuid=self['uuid'],
+ job_task=json.dumps({
+ 'output':s,
+ 'success':True,
+ 'progress':1.0
+ })).execute()
+
+_current_task = None
def current_task():
+ global _current_task
+ if _current_task:
+ return _current_task
t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+ t = UserDict.UserDict(t)
+ t.set_output = types.MethodType(task_set_output, t)
+ _current_task = t
return t
+_current_job = None
def current_job():
+ global _current_job
+ if _current_job:
+ return _current_job
t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
+ _current_job = t
return t
class JobTask:
class job_setup:
@staticmethod
def one_task_per_input_file(if_sequence=0, and_end_task=True):
- if if_sequence != current_job()['sequence']:
+ if if_sequence != current_task()['sequence']:
return
- job_input = current_job()['parameters']['input']
+ job_input = current_job()['script_parameters']['input']
p = subprocess.Popen(["whls", job_input],
stdout=subprocess.PIPE,
stdin=None, stderr=None,
shell=False, close_fds=True)
for f in p.stdout.read().split("\n"):
if f != '':
- task_input = job_input + '/' + f
+ task_input = job_input + '/' + re.sub(r'^\./', '', f)
new_task_attrs = {
'job_uuid': current_job()['uuid'],
+ 'sequence': if_sequence + 1,
'parameters': {
'input':task_input
}
}
- service.jobs_tasks().create(job_task=new_task_attrs)
+ service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
if and_end_task:
service.job_tasks().update(uuid=current_task()['uuid'],
- job_task={'success':True})
- exit 0
+ job_task=json.dumps({'success':True}))
+ exit(0)
+
+class DataReader:
+ def __init__(self, data_locator):
+ self.data_locator = data_locator
+ self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
+ stdout=subprocess.PIPE,
+ stdin=None, stderr=subprocess.PIPE,
+ shell=False, close_fds=True)
+ def __enter__(self):
+ pass
+ def __exit__(self):
+ self.close()
+ def read(self, size, **kwargs):
+ return self.p.stdout.read(size, **kwargs)
+ def close(self):
+ self.p.stdout.close()
+ if not self.p.stderr.closed:
+ for err in self.p.stderr:
+ print >> sys.stderr, err
+ self.p.stderr.close()
+ self.p.wait()
+ if self.p.returncode != 0:
+ raise Exception("subprocess exited %d" % self.p.returncode)