15 from apiclient import errors
16 from apiclient.discovery import build
18 class CredentialsFromEnv:
20 def http_request(self, uri, **kwargs):
21 from httplib import BadStatusLine
22 if 'headers' not in kwargs:
23 kwargs['headers'] = {}
24 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
26 return self.orig_http_request(uri, **kwargs)
28 # This is how httplib tells us that it tried to reuse an
29 # existing connection but it was already closed by the
30 # server. In that case, yes, we would like to retry.
31 # Unfortunately, we are not absolutely certain that the
32 # previous call did not succeed, so this is slightly
34 return self.orig_http_request(uri, **kwargs)
35 def authorize(self, http):
36 http.orig_http_request = http.request
37 http.request = types.MethodType(self.http_request, http)
40 url = ('https://%s/discovery/v1/apis/'
41 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
42 credentials = CredentialsFromEnv()
43 http = httplib2.Http()
44 http = credentials.authorize(http)
45 http.disable_ssl_certificate_validation=True
46 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
48 def task_set_output(self,s):
49 service.job_tasks().update(uuid=self['uuid'],
61 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
62 t = UserDict.UserDict(t)
63 t.set_output = types.MethodType(task_set_output, t)
72 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
80 def __init__(self, parameters=dict(), resource_limits=dict()):
81 print "init jobtask %s %s" % (parameters, resource_limits)
85 def one_task_per_input_file(if_sequence=0, and_end_task=True):
86 if if_sequence != current_task()['sequence']:
88 job_input = current_job()['script_parameters']['input']
89 cr = CollectionReader(job_input)
90 for s in cr.all_streams():
91 for f in s.all_files():
92 task_input = f.as_manifest()
94 'job_uuid': current_job()['uuid'],
95 'created_by_job_task': current_task()['uuid'],
96 'sequence': if_sequence + 1,
101 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
103 service.job_tasks().update(uuid=current_task()['uuid'],
104 job_task=json.dumps({'success':True})
109 def __init__(self, data_locator):
110 self.data_locator = data_locator
111 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
112 stdout=subprocess.PIPE,
113 stdin=None, stderr=subprocess.PIPE,
114 shell=False, close_fds=True)
119 def read(self, size, **kwargs):
120 return self.p.stdout.read(size, **kwargs)
122 self.p.stdout.close()
123 if not self.p.stderr.closed:
124 for err in self.p.stderr:
125 print >> sys.stderr, err
126 self.p.stderr.close()
128 if self.p.returncode != 0:
129 raise Exception("whget subprocess exited %d" % self.p.returncode)
131 class StreamFileReader:
132 def __init__(self, stream, pos, size, name):
133 self._stream = stream
142 def stream_name(self):
143 return self._stream.name()
144 def read(self, size, **kwargs):
145 self._stream.seek(self._pos + self._filepos)
146 data = self._stream.read(min(size, self._size - self._filepos))
147 self._filepos += len(data)
150 self._stream.seek(self._pos + self._filepos)
154 newdata = self.read(2**10)
155 if 0 == len(newdata):
159 eol = string.find(data, "\n", sol)
162 yield data[sol:eol+1]
167 def as_manifest(self):
168 return string.join(self._stream.tokens_for_range(self._pos, self._size),
172 def __init__(self, tokens):
173 self._tokens = tokens
174 self._current_datablock_data = None
175 self._current_datablock_pos = 0
176 self._current_datablock_index = -1
179 self._stream_name = None
180 self.data_locators = []
183 for tok in self._tokens:
184 if self._stream_name == None:
185 self._stream_name = tok
186 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
187 self.data_locators += [tok]
188 elif re.search(r'^\d+:\d+:\S+', tok):
189 pos, size, name = tok.split(':',2)
190 self.files += [[int(pos), int(size), name]]
192 raise Exception("Invalid manifest format")
193 def tokens_for_range(self, range_start, range_size):
194 resp = [self._stream_name]
195 return_all_tokens = False
197 token_bytes_skipped = 0
198 for locator in self.data_locators:
199 sizehint = re.search(r'\+(\d+)', locator)
201 return_all_tokens = True
202 if return_all_tokens:
205 blocksize = int(sizehint.group(0))
206 if range_start + range_size <= block_start:
208 if range_start < block_start + blocksize:
210 block_start += int(blocksize)
212 if ((f[0] < range_start + range_size)
214 (f[0] + f[1] > range_start)):
215 resp += ["%d:%d:%s" % (f[0], f[1], f[2])]
218 return self._stream_name
222 yield StreamFileReader(self, pos, size, name)
223 def nextdatablock(self):
224 if self._current_datablock_index < 0:
225 self._current_datablock_pos = 0
226 self._current_datablock_index = 0
228 self._current_datablock_pos += self.current_datablock_size()
229 self._current_datablock_index += 1
230 self._current_datablock_data = None
231 def current_datablock_data(self):
232 if self._current_datablock_data == None:
233 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
234 return self._current_datablock_data
235 def current_datablock_size(self):
236 if self._current_datablock_index < 0:
238 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
240 return int(sizehint.group(0))
241 return len(self.current_datablock_data())
243 """Set the position of the next read operation."""
245 def really_seek(self):
246 """Find and load the appropriate data block, so the byte at
249 if self._pos == self._current_datablock_pos:
251 if (self._current_datablock_pos != None and
252 self._pos >= self._current_datablock_pos and
253 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
255 if self._pos < self._current_datablock_pos:
256 self._current_datablock_index = -1
258 while (self._pos > self._current_datablock_pos and
259 self._pos > self._current_datablock_pos + self.current_datablock_size()):
261 def read(self, size):
262 """Read no more than size bytes -- but at least one byte,
263 unless _pos is already at the end of the stream.
268 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
270 if self._current_datablock_index >= len(self.data_locators):
272 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
273 self._pos += len(data)
276 class CollectionReader:
277 def __init__(self, manifest_locator_or_text):
278 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
279 self._manifest_text = manifest_locator_or_text
280 self._manifest_locator = None
282 self._manifest_locator = manifest_locator_or_text
283 self._manifest_text = None
290 if self._streams != None:
292 if not self._manifest_text:
293 self._manifest_text = Keep.get(self._manifest_locator)
295 for stream_line in self._manifest_text.split("\n"):
296 stream_tokens = stream_line.split()
297 self._streams += [stream_tokens]
298 def all_streams(self):
301 for s in self._streams:
302 resp += [StreamReader(s)]
305 for s in self.all_streams():
306 for f in s.all_files():
309 class CollectionWriter:
310 KEEP_BLOCK_SIZE = 2**26
312 self._data_buffer = ''
313 self._current_stream_files = []
314 self._current_stream_length = 0
315 self._current_stream_locators = []
316 self._current_stream_name = '.'
317 self._current_file_name = None
318 self._current_file_pos = 0
319 self._finished_streams = []
324 def write(self, newdata):
325 self._data_buffer += newdata
326 self._current_stream_length += len(newdata)
327 while len(self._data_buffer) >= self.KEEP_BLOCK_SIZE:
329 def flush_data(self):
330 if self._data_buffer != '':
331 self._current_stream_locators += [Keep.put(self._data_buffer[0:self.KEEP_BLOCK_SIZE])]
332 self._data_buffer = self._data_buffer[self.KEEP_BLOCK_SIZE:]
333 def start_new_file(self, newfilename=None):
334 self.finish_current_file()
335 self.set_current_file_name(newfilename)
336 def set_current_file_name(self, newfilename):
337 if re.search(r'[ \t\n]', newfilename):
338 raise AssertionError("Manifest filenames cannot contain whitespace")
339 self._current_file_name = newfilename
340 def current_file_name(self):
341 return self._current_file_name
342 def finish_current_file(self):
343 if self._current_file_name == None:
344 if self._current_file_pos == self._current_stream_length:
346 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
347 self._current_stream_files += [[self._current_file_pos,
348 self._current_stream_length - self._current_file_pos,
349 self._current_file_name]]
350 self._current_file_pos = self._current_stream_length
351 def start_new_stream(self, newstreamname=None):
352 self.finish_current_stream()
353 self.set_current_stream_name(newstreamname)
354 def set_current_stream_name(self, newstreamname):
355 if re.search(r'[ \t\n]', newstreamname):
356 raise AssertionError("Manifest stream names cannot contain whitespace")
357 self._current_stream_name = newstreamname
358 def current_stream_name(self):
359 return self._current_stream_name
360 def finish_current_stream(self):
361 self.finish_current_file()
363 if len(self._current_stream_files) == 0:
365 elif self._current_stream_name == None:
366 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
368 self._finished_streams += [[self._current_stream_name,
369 self._current_stream_locators,
370 self._current_stream_files]]
371 self._current_stream_files = []
372 self._current_stream_length = 0
373 self._current_stream_locators = []
374 self._current_stream_name = None
375 self._current_file_pos = 0
376 self._current_file_name = None
378 return Keep.put(self.manifest_text())
379 def manifest_text(self):
380 self.finish_current_stream()
382 for stream in self._finished_streams:
383 manifest += stream[0]
384 if len(stream[1]) == 0:
385 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
387 for locator in stream[1]:
388 manifest += " %s" % locator
389 for sfile in stream[2]:
390 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
397 if 'KEEP_LOCAL_STORE' in os.environ:
398 return Keep.local_store_put(data)
399 p = subprocess.Popen(["whput", "-"],
400 stdout=subprocess.PIPE,
401 stdin=subprocess.PIPE,
402 stderr=subprocess.PIPE,
403 shell=False, close_fds=True)
404 stdoutdata, stderrdata = p.communicate(data)
405 if p.returncode != 0:
406 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
407 return stdoutdata.rstrip()
410 if 'KEEP_LOCAL_STORE' in os.environ:
411 return Keep.local_store_get(locator)
412 p = subprocess.Popen(["whget", locator, "-"],
413 stdout=subprocess.PIPE,
415 stderr=subprocess.PIPE,
416 shell=False, close_fds=True)
417 stdoutdata, stderrdata = p.communicate(None)
418 if p.returncode != 0:
419 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
420 m = hashlib.new('md5')
423 if locator.index(m.hexdigest()) == 0:
427 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
429 def local_store_put(data):
430 m = hashlib.new('md5')
433 locator = '%s+%d' % (md5, len(data))
434 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
436 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
437 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
440 def local_store_get(locator):
441 r = re.search('^([0-9a-f]{32,})', locator)
442 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: