docs and fixes
[arvados.git] / sdk / python / arvados.py
1 import gflags
2 import httplib2
3 import logging
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14
15 from apiclient import errors
16 from apiclient.discovery import build
17
18 class CredentialsFromEnv:
19     @staticmethod
20     def http_request(self, uri, **kwargs):
21         from httplib import BadStatusLine
22         if 'headers' not in kwargs:
23             kwargs['headers'] = {}
24         kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
25         try:
26             return self.orig_http_request(uri, **kwargs)
27         except BadStatusLine:
28             # This is how httplib tells us that it tried to reuse an
29             # existing connection but it was already closed by the
30             # server. In that case, yes, we would like to retry.
31             # Unfortunately, we are not absolutely certain that the
32             # previous call did not succeed, so this is slightly
33             # risky.
34             return self.orig_http_request(uri, **kwargs)
35     def authorize(self, http):
36         http.orig_http_request = http.request
37         http.request = types.MethodType(self.http_request, http)
38         return http
39
40 url = ('https://%s/discovery/v1/apis/'
41        '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
42 credentials = CredentialsFromEnv()
43 http = httplib2.Http()
44 http = credentials.authorize(http)
45 http.disable_ssl_certificate_validation=True
46 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
47
48 def task_set_output(self,s):
49     service.job_tasks().update(uuid=self['uuid'],
50                                job_task=json.dumps({
51                 'output':s,
52                 'success':True,
53                 'progress':1.0
54                 })).execute()
55
56 _current_task = None
57 def current_task():
58     global _current_task
59     if _current_task:
60         return _current_task
61     t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
62     t = UserDict.UserDict(t)
63     t.set_output = types.MethodType(task_set_output, t)
64     _current_task = t
65     return t
66
67 _current_job = None
68 def current_job():
69     global _current_job
70     if _current_job:
71         return _current_job
72     t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
73     _current_job = t
74     return t
75
76 def api():
77     return service
78
79 class JobTask:
80     def __init__(self, parameters=dict(), resource_limits=dict()):
81         print "init jobtask %s %s" % (parameters, resource_limits)
82
83 class job_setup:
84     @staticmethod
85     def one_task_per_input_file(if_sequence=0, and_end_task=True):
86         if if_sequence != current_task()['sequence']:
87             return
88         job_input = current_job()['script_parameters']['input']
89         cr = CollectionReader(job_input)
90         for s in cr.all_streams():
91             for f in s.all_files():
92                 task_input = f.as_manifest()
93                 new_task_attrs = {
94                     'job_uuid': current_job()['uuid'],
95                     'created_by_job_task': current_task()['uuid'],
96                     'sequence': if_sequence + 1,
97                     'parameters': {
98                         'input':task_input
99                         }
100                     }
101                 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
102         if and_end_task:
103             service.job_tasks().update(uuid=current_task()['uuid'],
104                                        job_task=json.dumps({'success':True})
105                                        ).execute()
106             exit(0)
107
108 class DataReader:
109     def __init__(self, data_locator):
110         self.data_locator = data_locator
111         self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
112                                   stdout=subprocess.PIPE,
113                                   stdin=None, stderr=subprocess.PIPE,
114                                   shell=False, close_fds=True)
115     def __enter__(self):
116         pass
117     def __exit__(self):
118         self.close()
119     def read(self, size, **kwargs):
120         return self.p.stdout.read(size, **kwargs)
121     def close(self):
122         self.p.stdout.close()
123         if not self.p.stderr.closed:
124             for err in self.p.stderr:
125                 print >> sys.stderr, err
126             self.p.stderr.close()
127         self.p.wait()
128         if self.p.returncode != 0:
129             raise Exception("whget subprocess exited %d" % self.p.returncode)
130
131 class StreamFileReader:
132     def __init__(self, stream, pos, size, name):
133         self._stream = stream
134         self._pos = pos
135         self._size = size
136         self._name = name
137         self._filepos = 0
138     def name(self):
139         return self._name
140     def size(self):
141         return self._size
142     def stream_name(self):
143         return self._stream.name()
144     def read(self, size, **kwargs):
145         self._stream.seek(self._pos + self._filepos)
146         data = self._stream.read(min(size, self._size - self._filepos))
147         self._filepos += len(data)
148         return data
149     def readlines(self):
150         self._stream.seek(self._pos + self._filepos)
151         data = ''
152         sol = 0
153         while True:
154             newdata = self.read(2**10)
155             if 0 == len(newdata):
156                 break
157             data += newdata
158             while True:
159                 eol = string.find(data, "\n", sol)
160                 if eol < 0:
161                     break
162                 yield data[sol:eol+1]
163                 sol = eol+1
164             data = data[sol:]
165         if data != '':
166             yield data
167     def as_manifest(self):
168         return string.join(self._stream.tokens_for_range(self._pos, self._size),
169                            " ") + "\n"
170
171 class StreamReader:
172     def __init__(self, tokens):
173         self._tokens = tokens
174         self._current_datablock_data = None
175         self._current_datablock_pos = 0
176         self._current_datablock_index = -1
177         self._pos = 0
178
179         self._stream_name = None
180         self.data_locators = []
181         self.files = []
182
183         for tok in self._tokens:
184             if self._stream_name == None:
185                 self._stream_name = tok
186             elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
187                 self.data_locators += [tok]
188             elif re.search(r'^\d+:\d+:\S+', tok):
189                 pos, size, name = tok.split(':',2)
190                 self.files += [[int(pos), int(size), name]]
191             else:
192                 raise Exception("Invalid manifest format")
193     def tokens_for_range(self, range_start, range_size):
194         resp = [self._stream_name]
195         return_all_tokens = False
196         block_start = 0
197         token_bytes_skipped = 0
198         for locator in self.data_locators:
199             sizehint = re.search(r'\+(\d+)', locator)
200             if not sizehint:
201                 return_all_tokens = True
202             if return_all_tokens:
203                 resp += [locator]
204                 next
205             blocksize = int(sizehint.group(0))
206             if range_start + range_size <= block_start:
207                 break
208             if range_start < block_start + blocksize:
209                 resp += [locator]
210             block_start += int(blocksize)
211         for f in self.files:
212             if ((f[0] < range_start + range_size)
213                 and
214                 (f[0] + f[1] > range_start)):
215                 resp += ["%d:%d:%s" % (f[0], f[1], f[2])]
216         return resp
217     def name(self):
218         return self._stream_name
219     def all_files(self):
220         for f in self.files:
221             pos, size, name = f
222             yield StreamFileReader(self, pos, size, name)
223     def nextdatablock(self):
224         if self._current_datablock_index < 0:
225             self._current_datablock_pos = 0
226             self._current_datablock_index = 0
227         else:
228             self._current_datablock_pos += self.current_datablock_size()
229             self._current_datablock_index += 1
230         self._current_datablock_data = None
231     def current_datablock_data(self):
232         if self._current_datablock_data == None:
233             self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
234         return self._current_datablock_data
235     def current_datablock_size(self):
236         if self._current_datablock_index < 0:
237             self.nextdatablock()
238         sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
239         if sizehint:
240             return int(sizehint.group(0))
241         return len(self.current_datablock_data())
242     def seek(self, pos):
243         """Set the position of the next read operation."""
244         self._pos = pos
245     def really_seek(self):
246         """Find and load the appropriate data block, so the byte at
247         _pos is in memory.
248         """
249         if self._pos == self._current_datablock_pos:
250             return True
251         if (self._current_datablock_pos != None and
252             self._pos >= self._current_datablock_pos and
253             self._pos <= self._current_datablock_pos + self.current_datablock_size()):
254             return True
255         if self._pos < self._current_datablock_pos:
256             self._current_datablock_index = -1
257             self.nextdatablock()
258         while (self._pos > self._current_datablock_pos and
259                self._pos > self._current_datablock_pos + self.current_datablock_size()):
260             self.nextdatablock()
261     def read(self, size):
262         """Read no more than size bytes -- but at least one byte,
263         unless _pos is already at the end of the stream.
264         """
265         if size == 0:
266             return ''
267         self.really_seek()
268         while self._pos >= self._current_datablock_pos + self.current_datablock_size():
269             self.nextdatablock()
270             if self._current_datablock_index >= len(self.data_locators):
271                 return None
272         data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
273         self._pos += len(data)
274         return data
275
276 class CollectionReader:
277     def __init__(self, manifest_locator_or_text):
278         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
279             self._manifest_text = manifest_locator_or_text
280             self._manifest_locator = None
281         else:
282             self._manifest_locator = manifest_locator_or_text
283             self._manifest_text = None
284         self._streams = None
285     def __enter__(self):
286         pass
287     def __exit__(self):
288         pass
289     def _populate(self):
290         if self._streams != None:
291             return
292         if not self._manifest_text:
293             self._manifest_text = Keep.get(self._manifest_locator)
294         self._streams = []
295         for stream_line in self._manifest_text.split("\n"):
296             stream_tokens = stream_line.split()
297             self._streams += [stream_tokens]
298     def all_streams(self):
299         self._populate()
300         resp = []
301         for s in self._streams:
302             resp += [StreamReader(s)]
303         return resp
304     def all_files(self):
305         for s in self.all_streams():
306             for f in s.all_files():
307                 yield f
308
309 class CollectionWriter:
310     KEEP_BLOCK_SIZE = 2**26
311     def __init__(self):
312         self._data_buffer = ''
313         self._current_stream_files = []
314         self._current_stream_length = 0
315         self._current_stream_locators = []
316         self._current_stream_name = '.'
317         self._current_file_name = None
318         self._current_file_pos = 0
319         self._finished_streams = []
320     def __enter__(self):
321         pass
322     def __exit__(self):
323         self.finish()
324     def write(self, newdata):
325         self._data_buffer += newdata
326         self._current_stream_length += len(newdata)
327         while len(self._data_buffer) >= self.KEEP_BLOCK_SIZE:
328             self.flush_data()
329     def flush_data(self):
330         if self._data_buffer != '':
331             self._current_stream_locators += [Keep.put(self._data_buffer[0:self.KEEP_BLOCK_SIZE])]
332             self._data_buffer = self._data_buffer[self.KEEP_BLOCK_SIZE:]
333     def start_new_file(self, newfilename=None):
334         self.finish_current_file()
335         self.set_current_file_name(newfilename)
336     def set_current_file_name(self, newfilename):
337         if re.search(r'[ \t\n]', newfilename):
338             raise AssertionError("Manifest filenames cannot contain whitespace")
339         self._current_file_name = newfilename
340     def current_file_name(self):
341         return self._current_file_name
342     def finish_current_file(self):
343         if self._current_file_name == None:
344             if self._current_file_pos == self._current_stream_length:
345                 return
346             raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
347         self._current_stream_files += [[self._current_file_pos,
348                                        self._current_stream_length - self._current_file_pos,
349                                        self._current_file_name]]
350         self._current_file_pos = self._current_stream_length
351     def start_new_stream(self, newstreamname=None):
352         self.finish_current_stream()
353         self.set_current_stream_name(newstreamname)
354     def set_current_stream_name(self, newstreamname):
355         if re.search(r'[ \t\n]', newstreamname):
356             raise AssertionError("Manifest stream names cannot contain whitespace")
357         self._current_stream_name = newstreamname
358     def current_stream_name(self):
359         return self._current_stream_name
360     def finish_current_stream(self):
361         self.finish_current_file()
362         self.flush_data()
363         if len(self._current_stream_files) == 0:
364             pass
365         elif self._current_stream_name == None:
366             raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
367         else:
368             self._finished_streams += [[self._current_stream_name,
369                                        self._current_stream_locators,
370                                        self._current_stream_files]]
371         self._current_stream_files = []
372         self._current_stream_length = 0
373         self._current_stream_locators = []
374         self._current_stream_name = None
375         self._current_file_pos = 0
376         self._current_file_name = None
377     def finish(self):
378         return Keep.put(self.manifest_text())
379     def manifest_text(self):
380         self.finish_current_stream()
381         manifest = ''
382         for stream in self._finished_streams:
383             manifest += stream[0]
384             if len(stream[1]) == 0:
385                 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
386             else:
387                 for locator in stream[1]:
388                     manifest += " %s" % locator
389             for sfile in stream[2]:
390                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
391             manifest += "\n"
392         return manifest
393
394 class Keep:
395     @staticmethod
396     def put(data):
397         if 'KEEP_LOCAL_STORE' in os.environ:
398             return Keep.local_store_put(data)
399         p = subprocess.Popen(["whput", "-"],
400                              stdout=subprocess.PIPE,
401                              stdin=subprocess.PIPE,
402                              stderr=subprocess.PIPE,
403                              shell=False, close_fds=True)
404         stdoutdata, stderrdata = p.communicate(data)
405         if p.returncode != 0:
406             raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
407         return stdoutdata.rstrip()
408     @staticmethod
409     def get(locator):
410         if 'KEEP_LOCAL_STORE' in os.environ:
411             return Keep.local_store_get(locator)
412         p = subprocess.Popen(["whget", locator, "-"],
413                              stdout=subprocess.PIPE,
414                              stdin=None,
415                              stderr=subprocess.PIPE,
416                              shell=False, close_fds=True)
417         stdoutdata, stderrdata = p.communicate(None)
418         if p.returncode != 0:
419             raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
420         m = hashlib.new('md5')
421         m.update(stdoutdata)
422         try:
423             if locator.index(m.hexdigest()) == 0:
424                 return stdoutdata
425         except ValueError:
426             pass
427         raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
428     @staticmethod
429     def local_store_put(data):
430         m = hashlib.new('md5')
431         m.update(data)
432         md5 = m.hexdigest()
433         locator = '%s+%d' % (md5, len(data))
434         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
435             f.write(data)
436         os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
437                   os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
438         return locator
439     @staticmethod
440     def local_store_get(locator):
441         r = re.search('^([0-9a-f]{32,})', locator)
442         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
443             return f.read()