18 from apiclient import errors
19 from apiclient.discovery import build
21 class CredentialsFromEnv:
23 def http_request(self, uri, **kwargs):
24 from httplib import BadStatusLine
25 if 'headers' not in kwargs:
26 kwargs['headers'] = {}
27 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
29 return self.orig_http_request(uri, **kwargs)
31 # This is how httplib tells us that it tried to reuse an
32 # existing connection but it was already closed by the
33 # server. In that case, yes, we would like to retry.
34 # Unfortunately, we are not absolutely certain that the
35 # previous call did not succeed, so this is slightly
37 return self.orig_http_request(uri, **kwargs)
38 def authorize(self, http):
39 http.orig_http_request = http.request
40 http.request = types.MethodType(self.http_request, http)
43 url = ('https://%s/discovery/v1/apis/'
44 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
51 def task_set_output(self,s):
52 service.job_tasks().update(uuid=self['uuid'],
64 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65 t = UserDict.UserDict(t)
66 t.set_output = types.MethodType(task_set_output, t)
67 t.tmpdir = os.environ['TASK_WORK']
76 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77 t = UserDict.UserDict(t)
78 t.tmpdir = os.environ['JOB_WORK']
86 def __init__(self, parameters=dict(), resource_limits=dict()):
87 print "init jobtask %s %s" % (parameters, resource_limits)
91 def one_task_per_input_file(if_sequence=0, and_end_task=True):
92 if if_sequence != current_task()['sequence']:
94 job_input = current_job()['script_parameters']['input']
95 cr = CollectionReader(job_input)
96 for s in cr.all_streams():
97 for f in s.all_files():
98 task_input = f.as_manifest()
100 'job_uuid': current_job()['uuid'],
101 'created_by_job_task_uuid': current_task()['uuid'],
102 'sequence': if_sequence + 1,
107 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
109 service.job_tasks().update(uuid=current_task()['uuid'],
110 job_task=json.dumps({'success':True})
116 def run_command(execargs, **kwargs):
117 p = subprocess.Popen(execargs, close_fds=True, shell=False,
118 stdin=subprocess.PIPE,
119 stdout=subprocess.PIPE,
120 stderr=subprocess.PIPE,
122 stdoutdata, stderrdata = p.communicate(None)
123 if p.returncode != 0:
124 raise Exception("run_command %s exit %d:\n%s" %
125 (execargs, p.returncode, stderrdata))
126 return stdoutdata, stderrdata
129 def git_checkout(url, version, path):
130 if not re.search('^/', path):
131 path = os.path.join(current_job().tmpdir, path)
132 if not os.path.exists(path):
133 util.run_command(["git", "clone", url, path],
134 cwd=os.path.dirname(path))
135 util.run_command(["git", "checkout", version],
140 def tarball_extract(tarball, path):
141 """Retrieve a tarball from Keep and extract it to a local
142 directory. Return the absolute path where the tarball was
143 extracted. If the top level of the tarball contained just one
144 file or directory, return the absolute path of that single
147 tarball -- collection locator
148 path -- where to extract the tarball: absolute, or relative to job tmp
150 if not re.search('^/', path):
151 path = os.path.join(current_job().tmpdir, path)
152 lockfile = open(path + '.lock', 'w')
153 fcntl.flock(lockfile, fcntl.LOCK_EX)
158 already_have_it = False
160 if os.readlink(os.path.join(path, '.locator')) == tarball:
161 already_have_it = True
164 if not already_have_it:
166 # emulate "rm -f" (i.e., if the file does not exist, we win)
168 os.unlink(os.path.join(path, '.locator'))
170 if os.path.exists(os.path.join(path, '.locator')):
171 os.unlink(os.path.join(path, '.locator'))
173 for f in CollectionReader(tarball).all_files():
175 if re.search('\.(tbz|tar.bz2)$', f.name()):
176 decompress_flag = 'j'
177 elif re.search('\.(tgz|tar.gz)$', f.name()):
178 decompress_flag = 'z'
179 p = subprocess.Popen(["tar",
181 ("-x%sf" % decompress_flag),
184 stdin=subprocess.PIPE, stderr=sys.stderr,
185 shell=False, close_fds=True)
193 if p.returncode != 0:
195 raise Exception("tar exited %d" % p.returncode)
196 os.symlink(tarball, os.path.join(path, '.locator'))
197 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
199 if len(tld_extracts) == 1:
200 return os.path.join(path, tld_extracts[0])
204 def collection_extract(collection, path, files=[]):
205 """Retrieve a collection from Keep and extract it to a local
206 directory. Return the absolute path where the collection was
209 collection -- collection locator
210 path -- where to extract: absolute, or relative to job tmp
212 if not re.search('^/', path):
213 path = os.path.join(current_job().tmpdir, path)
214 lockfile = open(path + '.lock', 'w')
215 fcntl.flock(lockfile, fcntl.LOCK_EX)
220 already_have_it = False
222 if os.readlink(os.path.join(path, '.locator')) == collection:
223 already_have_it = True
226 if not already_have_it:
227 # emulate "rm -f" (i.e., if the file does not exist, we win)
229 os.unlink(os.path.join(path, '.locator'))
231 if os.path.exists(os.path.join(path, '.locator')):
232 os.unlink(os.path.join(path, '.locator'))
234 for f in CollectionReader(collection).all_files():
235 if files == [] or f.name() in files:
236 outfile = open(os.path.join(path, f.name()), 'w')
243 os.symlink(collection, os.path.join(path, '.locator'))
248 def __init__(self, data_locator):
249 self.data_locator = data_locator
250 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
251 stdout=subprocess.PIPE,
252 stdin=None, stderr=subprocess.PIPE,
253 shell=False, close_fds=True)
258 def read(self, size, **kwargs):
259 return self.p.stdout.read(size, **kwargs)
261 self.p.stdout.close()
262 if not self.p.stderr.closed:
263 for err in self.p.stderr:
264 print >> sys.stderr, err
265 self.p.stderr.close()
267 if self.p.returncode != 0:
268 raise Exception("whget subprocess exited %d" % self.p.returncode)
270 class StreamFileReader:
271 def __init__(self, stream, pos, size, name):
272 self._stream = stream
279 def decompressed_name(self):
280 return re.sub('\.(bz2|gz)$', '', self._name)
283 def stream_name(self):
284 return self._stream.name()
285 def read(self, size, **kwargs):
286 self._stream.seek(self._pos + self._filepos)
287 data = self._stream.read(min(size, self._size - self._filepos))
288 self._filepos += len(data)
290 def readall(self, size, **kwargs):
292 data = self.read(size, **kwargs)
296 def bunzip2(self, size):
297 decompressor = bz2.BZ2Decompressor()
298 for chunk in self.readall(size):
299 data = decompressor.decompress(chunk)
300 if data and data != '':
302 def gunzip(self, size):
303 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
304 for chunk in self.readall(size):
305 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
306 if data and data != '':
308 def readlines(self, decompress=True):
309 self._stream.seek(self._pos + self._filepos)
310 if decompress and re.search('\.bz2$', self._name):
311 datasource = self.bunzip2(2**10)
312 elif decompress and re.search('\.gz$', self._name):
313 datasource = self.gunzip(2**10)
315 datasource = self.readall(2**10)
317 for newdata in datasource:
321 eol = string.find(data, "\n", sol)
324 yield data[sol:eol+1]
329 def as_manifest(self):
331 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
332 % (self._stream.name(), self.name()))
333 return string.join(self._stream.tokens_for_range(self._pos, self._size),
337 def __init__(self, tokens):
338 self._tokens = tokens
339 self._current_datablock_data = None
340 self._current_datablock_pos = 0
341 self._current_datablock_index = -1
344 self._stream_name = None
345 self.data_locators = []
348 for tok in self._tokens:
349 if self._stream_name == None:
350 self._stream_name = tok
351 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
352 self.data_locators += [tok]
353 elif re.search(r'^\d+:\d+:\S+', tok):
354 pos, size, name = tok.split(':',2)
355 self.files += [[int(pos), int(size), name]]
357 raise Exception("Invalid manifest format")
358 def tokens_for_range(self, range_start, range_size):
359 resp = [self._stream_name]
360 return_all_tokens = False
362 token_bytes_skipped = 0
363 for locator in self.data_locators:
364 sizehint = re.search(r'\+(\d+)', locator)
366 return_all_tokens = True
367 if return_all_tokens:
370 blocksize = int(sizehint.group(0))
371 if range_start + range_size <= block_start:
373 if range_start < block_start + blocksize:
376 token_bytes_skipped += blocksize
377 block_start += blocksize
379 if ((f[0] < range_start + range_size)
381 (f[0] + f[1] > range_start)
384 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
387 return self._stream_name
391 yield StreamFileReader(self, pos, size, name)
392 def nextdatablock(self):
393 if self._current_datablock_index < 0:
394 self._current_datablock_pos = 0
395 self._current_datablock_index = 0
397 self._current_datablock_pos += self.current_datablock_size()
398 self._current_datablock_index += 1
399 self._current_datablock_data = None
400 def current_datablock_data(self):
401 if self._current_datablock_data == None:
402 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
403 return self._current_datablock_data
404 def current_datablock_size(self):
405 if self._current_datablock_index < 0:
407 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
409 return int(sizehint.group(0))
410 return len(self.current_datablock_data())
412 """Set the position of the next read operation."""
414 def really_seek(self):
415 """Find and load the appropriate data block, so the byte at
418 if self._pos == self._current_datablock_pos:
420 if (self._current_datablock_pos != None and
421 self._pos >= self._current_datablock_pos and
422 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
424 if self._pos < self._current_datablock_pos:
425 self._current_datablock_index = -1
427 while (self._pos > self._current_datablock_pos and
428 self._pos > self._current_datablock_pos + self.current_datablock_size()):
430 def read(self, size):
431 """Read no more than size bytes -- but at least one byte,
432 unless _pos is already at the end of the stream.
437 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
439 if self._current_datablock_index >= len(self.data_locators):
441 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
442 self._pos += len(data)
445 class CollectionReader:
446 def __init__(self, manifest_locator_or_text):
447 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
448 self._manifest_text = manifest_locator_or_text
449 self._manifest_locator = None
451 self._manifest_locator = manifest_locator_or_text
452 self._manifest_text = None
459 if self._streams != None:
461 if not self._manifest_text:
462 self._manifest_text = Keep.get(self._manifest_locator)
464 for stream_line in self._manifest_text.split("\n"):
465 stream_tokens = stream_line.split()
466 self._streams += [stream_tokens]
467 def all_streams(self):
470 for s in self._streams:
471 resp += [StreamReader(s)]
474 for s in self.all_streams():
475 for f in s.all_files():
478 class CollectionWriter:
479 KEEP_BLOCK_SIZE = 2**26
481 self._data_buffer = []
482 self._data_buffer_len = 0
483 self._current_stream_files = []
484 self._current_stream_length = 0
485 self._current_stream_locators = []
486 self._current_stream_name = '.'
487 self._current_file_name = None
488 self._current_file_pos = 0
489 self._finished_streams = []
494 def write(self, newdata):
495 self._data_buffer += [newdata]
496 self._data_buffer_len += len(newdata)
497 self._current_stream_length += len(newdata)
498 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
500 def flush_data(self):
501 data_buffer = ''.join(self._data_buffer)
502 if data_buffer != '':
503 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
504 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
505 def start_new_file(self, newfilename=None):
506 self.finish_current_file()
507 self.set_current_file_name(newfilename)
508 def set_current_file_name(self, newfilename):
509 if re.search(r'[ \t\n]', newfilename):
510 raise AssertionError("Manifest filenames cannot contain whitespace")
511 self._current_file_name = newfilename
512 def current_file_name(self):
513 return self._current_file_name
514 def finish_current_file(self):
515 if self._current_file_name == None:
516 if self._current_file_pos == self._current_stream_length:
518 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
519 self._current_stream_files += [[self._current_file_pos,
520 self._current_stream_length - self._current_file_pos,
521 self._current_file_name]]
522 self._current_file_pos = self._current_stream_length
523 def start_new_stream(self, newstreamname=None):
524 self.finish_current_stream()
525 self.set_current_stream_name(newstreamname)
526 def set_current_stream_name(self, newstreamname):
527 if re.search(r'[ \t\n]', newstreamname):
528 raise AssertionError("Manifest stream names cannot contain whitespace")
529 self._current_stream_name = newstreamname
530 def current_stream_name(self):
531 return self._current_stream_name
532 def finish_current_stream(self):
533 self.finish_current_file()
535 if len(self._current_stream_files) == 0:
537 elif self._current_stream_name == None:
538 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
540 self._finished_streams += [[self._current_stream_name,
541 self._current_stream_locators,
542 self._current_stream_files]]
543 self._current_stream_files = []
544 self._current_stream_length = 0
545 self._current_stream_locators = []
546 self._current_stream_name = None
547 self._current_file_pos = 0
548 self._current_file_name = None
550 return Keep.put(self.manifest_text())
551 def manifest_text(self):
552 self.finish_current_stream()
554 for stream in self._finished_streams:
555 manifest += stream[0]
556 if len(stream[1]) == 0:
557 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
559 for locator in stream[1]:
560 manifest += " %s" % locator
561 for sfile in stream[2]:
562 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
569 if 'KEEP_LOCAL_STORE' in os.environ:
570 return Keep.local_store_put(data)
571 p = subprocess.Popen(["whput", "-"],
572 stdout=subprocess.PIPE,
573 stdin=subprocess.PIPE,
574 stderr=subprocess.PIPE,
575 shell=False, close_fds=True)
576 stdoutdata, stderrdata = p.communicate(data)
577 if p.returncode != 0:
578 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
579 return stdoutdata.rstrip()
582 if 'KEEP_LOCAL_STORE' in os.environ:
583 return Keep.local_store_get(locator)
584 p = subprocess.Popen(["whget", locator, "-"],
585 stdout=subprocess.PIPE,
587 stderr=subprocess.PIPE,
588 shell=False, close_fds=True)
589 stdoutdata, stderrdata = p.communicate(None)
590 if p.returncode != 0:
591 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
592 m = hashlib.new('md5')
595 if locator.index(m.hexdigest()) == 0:
599 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
601 def local_store_put(data):
602 m = hashlib.new('md5')
605 locator = '%s+%d' % (md5, len(data))
606 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
608 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
609 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
612 def local_store_get(locator):
613 r = re.search('^([0-9a-f]{32,})', locator)
615 raise Exception("Keep.get: invalid data locator '%s'" % locator)
616 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
618 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: