35c071e01a60a09d7388eee972c54750ea13dd5d
[arvados.git] / sdk / python / arvados.py
1 import gflags
2 import httplib2
3 import logging
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12
13 from apiclient import errors
14 from apiclient.discovery import build
15
16 class CredentialsFromEnv:
17     @staticmethod
18     def http_request(self, uri, **kwargs):
19         if 'headers' not in kwargs:
20             kwargs['headers'] = {}
21         kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
22         return self.orig_http_request(uri, **kwargs)
23     def authorize(self, http):
24         http.orig_http_request = http.request
25         http.request = types.MethodType(self.http_request, http)
26         return http
27
28 url = ('https://%s/discovery/v1/apis/'
29        '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
30 credentials = CredentialsFromEnv()
31 http = httplib2.Http()
32 http = credentials.authorize(http)
33 http.disable_ssl_certificate_validation=True
34 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
35
36 def task_set_output(self,s):
37     service.job_tasks().update(uuid=self['uuid'],
38                                job_task=json.dumps({
39                 'output':s,
40                 'success':True,
41                 'progress':1.0
42                 })).execute()
43
44 _current_task = None
45 def current_task():
46     global _current_task
47     if _current_task:
48         return _current_task
49     t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
50     t = UserDict.UserDict(t)
51     t.set_output = types.MethodType(task_set_output, t)
52     _current_task = t
53     return t
54
55 _current_job = None
56 def current_job():
57     global _current_job
58     if _current_job:
59         return _current_job
60     t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
61     _current_job = t
62     return t
63
64 class JobTask:
65     def __init__(self, parameters=dict(), resource_limits=dict()):
66         print "init jobtask %s %s" % (parameters, resource_limits)
67
68 class job_setup:
69     @staticmethod
70     def one_task_per_input_file(if_sequence=0, and_end_task=True):
71         if if_sequence != current_task()['sequence']:
72             return
73         job_input = current_job()['script_parameters']['input']
74         p = subprocess.Popen(["whls", job_input],
75                              stdout=subprocess.PIPE,
76                              stdin=None, stderr=None,
77                              shell=False, close_fds=True)
78         for f in p.stdout.read().split("\n"):
79             if f != '':
80                 task_input = job_input + '/' + re.sub(r'^\./', '', f)
81                 new_task_attrs = {
82                     'job_uuid': current_job()['uuid'],
83                     'created_by_job_task': current_task()['uuid'],
84                     'sequence': if_sequence + 1,
85                     'parameters': {
86                         'input':task_input
87                         }
88                     }
89                 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
90         if and_end_task:
91             service.job_tasks().update(uuid=current_task()['uuid'],
92                                        job_task=json.dumps({'success':True}))
93             exit(0)
94
95 class DataReader:
96     def __init__(self, data_locator):
97         self.data_locator = data_locator
98         self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
99                                   stdout=subprocess.PIPE,
100                                   stdin=None, stderr=subprocess.PIPE,
101                                   shell=False, close_fds=True)
102     def __enter__(self):
103         pass
104     def __exit__(self):
105         self.close()
106     def read(self, size, **kwargs):
107         return self.p.stdout.read(size, **kwargs)
108     def close(self):
109         self.p.stdout.close()
110         if not self.p.stderr.closed:
111             for err in self.p.stderr:
112                 print >> sys.stderr, err
113             self.p.stderr.close()
114         self.p.wait()
115         if self.p.returncode != 0:
116             raise Exception("subprocess exited %d" % self.p.returncode)