13 from apiclient import errors
14 from apiclient.discovery import build
16 class CredentialsFromEnv:
18 def http_request(self, uri, **kwargs):
19 if 'headers' not in kwargs:
20 kwargs['headers'] = {}
21 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
22 return self.orig_http_request(uri, **kwargs)
23 def authorize(self, http):
24 http.orig_http_request = http.request
25 http.request = types.MethodType(self.http_request, http)
28 url = ('https://%s/discovery/v1/apis/'
29 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
30 credentials = CredentialsFromEnv()
31 http = httplib2.Http()
32 http = credentials.authorize(http)
33 http.disable_ssl_certificate_validation=True
34 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
36 def task_set_output(self,s):
37 service.job_tasks().update(uuid=self['uuid'],
49 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
50 t = UserDict.UserDict(t)
51 t.set_output = types.MethodType(task_set_output, t)
60 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
65 def __init__(self, parameters=dict(), resource_limits=dict()):
66 print "init jobtask %s %s" % (parameters, resource_limits)
70 def one_task_per_input_file(if_sequence=0, and_end_task=True):
71 if if_sequence != current_task()['sequence']:
73 job_input = current_job()['script_parameters']['input']
74 p = subprocess.Popen(["whls", job_input],
75 stdout=subprocess.PIPE,
76 stdin=None, stderr=None,
77 shell=False, close_fds=True)
78 for f in p.stdout.read().split("\n"):
80 task_input = job_input + '/' + re.sub(r'^\./', '', f)
82 'job_uuid': current_job()['uuid'],
83 'created_by_job_task': current_task()['uuid'],
84 'sequence': if_sequence + 1,
89 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
91 service.job_tasks().update(uuid=current_task()['uuid'],
92 job_task=json.dumps({'success':True}))
96 def __init__(self, data_locator):
97 self.data_locator = data_locator
98 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
99 stdout=subprocess.PIPE,
100 stdin=None, stderr=subprocess.PIPE,
101 shell=False, close_fds=True)
106 def read(self, size, **kwargs):
107 return self.p.stdout.read(size, **kwargs)
109 self.p.stdout.close()
110 if not self.p.stderr.closed:
111 for err in self.p.stderr:
112 print >> sys.stderr, err
113 self.p.stderr.close()
115 if self.p.returncode != 0:
116 raise Exception("subprocess exited %d" % self.p.returncode)