18 from apiclient import errors
19 from apiclient.discovery import build
21 class CredentialsFromEnv:
23 def http_request(self, uri, **kwargs):
24 from httplib import BadStatusLine
25 if 'headers' not in kwargs:
26 kwargs['headers'] = {}
27 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
29 return self.orig_http_request(uri, **kwargs)
31 # This is how httplib tells us that it tried to reuse an
32 # existing connection but it was already closed by the
33 # server. In that case, yes, we would like to retry.
34 # Unfortunately, we are not absolutely certain that the
35 # previous call did not succeed, so this is slightly
37 return self.orig_http_request(uri, **kwargs)
38 def authorize(self, http):
39 http.orig_http_request = http.request
40 http.request = types.MethodType(self.http_request, http)
43 url = ('https://%s/discovery/v1/apis/'
44 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
51 def task_set_output(self,s):
52 service.job_tasks().update(uuid=self['uuid'],
64 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65 t = UserDict.UserDict(t)
66 t.set_output = types.MethodType(task_set_output, t)
67 t.tmpdir = os.environ['TASK_WORK']
76 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77 t = UserDict.UserDict(t)
78 t.tmpdir = os.environ['JOB_WORK']
86 def __init__(self, parameters=dict(), resource_limits=dict()):
87 print "init jobtask %s %s" % (parameters, resource_limits)
91 def one_task_per_input_file(if_sequence=0, and_end_task=True):
92 if if_sequence != current_task()['sequence']:
94 job_input = current_job()['script_parameters']['input']
95 cr = CollectionReader(job_input)
96 for s in cr.all_streams():
97 for f in s.all_files():
98 task_input = f.as_manifest()
100 'job_uuid': current_job()['uuid'],
101 'created_by_job_task_uuid': current_task()['uuid'],
102 'sequence': if_sequence + 1,
107 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
109 service.job_tasks().update(uuid=current_task()['uuid'],
110 job_task=json.dumps({'success':True})
116 def run_command(execargs, **kwargs):
117 p = subprocess.Popen(execargs, close_fds=True, shell=False,
118 stdin=subprocess.PIPE,
119 stdout=subprocess.PIPE,
120 stderr=subprocess.PIPE,
122 stdoutdata, stderrdata = p.communicate(None)
123 if p.returncode != 0:
124 raise Exception("run_command %s exit %d:\n%s" %
125 (execargs, p.returncode, stderrdata))
126 return stdoutdata, stderrdata
129 def git_checkout(url, version, path):
130 if not re.search('^/', path):
131 path = os.path.join(current_job().tmpdir, path)
132 if not os.path.exists(path):
133 util.run_command(["git", "clone", url, path],
134 cwd=os.path.dirname(path))
135 util.run_command(["git", "checkout", version],
140 def tarball_extract(tarball, path):
141 """Retrieve a tarball from Keep and extract it to a local
142 directory. Return the absolute path where the tarball was
143 extracted. If the top level of the tarball contained just one
144 file or directory, return the absolute path of that single
147 tarball -- collection locator
148 path -- where to extract the tarball: absolute, or relative to job tmp
150 if not re.search('^/', path):
151 path = os.path.join(current_job().tmpdir, path)
152 lockfile = open(path + '.lock', 'w')
153 fcntl.flock(lockfile, fcntl.LOCK_EX)
158 already_have_it = False
160 if os.readlink(os.path.join(path, '.locator')) == tarball:
161 already_have_it = True
164 if not already_have_it:
166 # emulate "rm -f" (i.e., if the file does not exist, we win)
168 os.unlink(os.path.join(path, '.locator'))
170 if os.path.exists(os.path.join(path, '.locator')):
171 os.unlink(os.path.join(path, '.locator'))
173 for f in CollectionReader(tarball).all_files():
175 if re.search('\.(tbz|tar.bz2)$', f.name()):
176 decompress_flag = 'j'
177 elif re.search('\.(tgz|tar.gz)$', f.name()):
178 decompress_flag = 'z'
179 p = subprocess.Popen(["tar",
181 ("-x%sf" % decompress_flag),
184 stdin=subprocess.PIPE, stderr=sys.stderr,
185 shell=False, close_fds=True)
193 if p.returncode != 0:
195 raise Exception("tar exited %d" % p.returncode)
196 os.symlink(tarball, os.path.join(path, '.locator'))
197 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
199 if len(tld_extracts) == 1:
200 return os.path.join(path, tld_extracts[0])
204 def collection_extract(collection, path, files=[], decompress=True):
205 """Retrieve a collection from Keep and extract it to a local
206 directory. Return the absolute path where the collection was
209 collection -- collection locator
210 path -- where to extract: absolute, or relative to job tmp
212 if not re.search('^/', path):
213 path = os.path.join(current_job().tmpdir, path)
214 lockfile = open(path + '.lock', 'w')
215 fcntl.flock(lockfile, fcntl.LOCK_EX)
220 already_have_it = False
222 if os.readlink(os.path.join(path, '.locator')) == collection:
223 already_have_it = True
227 # emulate "rm -f" (i.e., if the file does not exist, we win)
230 os.unlink(os.path.join(path, '.locator'))
232 if os.path.exists(os.path.join(path, '.locator')):
233 os.unlink(os.path.join(path, '.locator'))
235 for f in CollectionReader(collection).all_files():
237 ((f.name() not in files_got) and
238 (f.name() in files or
239 (decompress and f.decompressed_name() in files)))):
240 outname = f.decompressed_name() if decompress else f.name()
241 files_got += [outname]
242 if os.path.exists(os.path.join(path, outname)):
244 outfile = open(os.path.join(path, outname), 'w')
245 for buf in (f.readall_decompressed() if decompress
249 if len(files_got) < len(files):
250 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
251 os.symlink(collection, os.path.join(path, '.locator'))
257 def __init__(self, data_locator):
258 self.data_locator = data_locator
259 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
260 stdout=subprocess.PIPE,
261 stdin=None, stderr=subprocess.PIPE,
262 shell=False, close_fds=True)
267 def read(self, size, **kwargs):
268 return self.p.stdout.read(size, **kwargs)
270 self.p.stdout.close()
271 if not self.p.stderr.closed:
272 for err in self.p.stderr:
273 print >> sys.stderr, err
274 self.p.stderr.close()
276 if self.p.returncode != 0:
277 raise Exception("whget subprocess exited %d" % self.p.returncode)
279 class StreamFileReader:
280 def __init__(self, stream, pos, size, name):
281 self._stream = stream
288 def decompressed_name(self):
289 return re.sub('\.(bz2|gz)$', '', self._name)
292 def stream_name(self):
293 return self._stream.name()
294 def read(self, size, **kwargs):
295 self._stream.seek(self._pos + self._filepos)
296 data = self._stream.read(min(size, self._size - self._filepos))
297 self._filepos += len(data)
299 def readall(self, size=2**20, **kwargs):
301 data = self.read(size, **kwargs)
305 def bunzip2(self, size):
306 decompressor = bz2.BZ2Decompressor()
307 for chunk in self.readall(size):
308 data = decompressor.decompress(chunk)
309 if data and data != '':
311 def gunzip(self, size):
312 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
313 for chunk in self.readall(size):
314 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
315 if data and data != '':
317 def readall_decompressed(self, size=2**20):
318 self._stream.seek(self._pos + self._filepos)
319 if re.search('\.bz2$', self._name):
320 return self.bunzip2(size)
321 elif re.search('\.gz$', self._name):
322 return self.gunzip(size)
324 return self.readall(size)
325 def readlines(self, decompress=True):
327 datasource = self.readall_decompressed()
329 self._stream.seek(self._pos + self._filepos)
330 datasource = self.readall()
332 for newdata in datasource:
336 eol = string.find(data, "\n", sol)
339 yield data[sol:eol+1]
344 def as_manifest(self):
346 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
347 % (self._stream.name(), self.name()))
348 return string.join(self._stream.tokens_for_range(self._pos, self._size),
352 def __init__(self, tokens):
353 self._tokens = tokens
354 self._current_datablock_data = None
355 self._current_datablock_pos = 0
356 self._current_datablock_index = -1
359 self._stream_name = None
360 self.data_locators = []
363 for tok in self._tokens:
364 if self._stream_name == None:
365 self._stream_name = tok
366 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
367 self.data_locators += [tok]
368 elif re.search(r'^\d+:\d+:\S+', tok):
369 pos, size, name = tok.split(':',2)
370 self.files += [[int(pos), int(size), name]]
372 raise Exception("Invalid manifest format")
373 def tokens_for_range(self, range_start, range_size):
374 resp = [self._stream_name]
375 return_all_tokens = False
377 token_bytes_skipped = 0
378 for locator in self.data_locators:
379 sizehint = re.search(r'\+(\d+)', locator)
381 return_all_tokens = True
382 if return_all_tokens:
385 blocksize = int(sizehint.group(0))
386 if range_start + range_size <= block_start:
388 if range_start < block_start + blocksize:
391 token_bytes_skipped += blocksize
392 block_start += blocksize
394 if ((f[0] < range_start + range_size)
396 (f[0] + f[1] > range_start)
399 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
402 return self._stream_name
406 yield StreamFileReader(self, pos, size, name)
407 def nextdatablock(self):
408 if self._current_datablock_index < 0:
409 self._current_datablock_pos = 0
410 self._current_datablock_index = 0
412 self._current_datablock_pos += self.current_datablock_size()
413 self._current_datablock_index += 1
414 self._current_datablock_data = None
415 def current_datablock_data(self):
416 if self._current_datablock_data == None:
417 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
418 return self._current_datablock_data
419 def current_datablock_size(self):
420 if self._current_datablock_index < 0:
422 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
424 return int(sizehint.group(0))
425 return len(self.current_datablock_data())
427 """Set the position of the next read operation."""
429 def really_seek(self):
430 """Find and load the appropriate data block, so the byte at
433 if self._pos == self._current_datablock_pos:
435 if (self._current_datablock_pos != None and
436 self._pos >= self._current_datablock_pos and
437 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
439 if self._pos < self._current_datablock_pos:
440 self._current_datablock_index = -1
442 while (self._pos > self._current_datablock_pos and
443 self._pos > self._current_datablock_pos + self.current_datablock_size()):
445 def read(self, size):
446 """Read no more than size bytes -- but at least one byte,
447 unless _pos is already at the end of the stream.
452 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
454 if self._current_datablock_index >= len(self.data_locators):
456 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
457 self._pos += len(data)
460 class CollectionReader:
461 def __init__(self, manifest_locator_or_text):
462 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
463 self._manifest_text = manifest_locator_or_text
464 self._manifest_locator = None
466 self._manifest_locator = manifest_locator_or_text
467 self._manifest_text = None
474 if self._streams != None:
476 if not self._manifest_text:
477 self._manifest_text = Keep.get(self._manifest_locator)
479 for stream_line in self._manifest_text.split("\n"):
480 stream_tokens = stream_line.split()
481 self._streams += [stream_tokens]
482 def all_streams(self):
485 for s in self._streams:
486 resp += [StreamReader(s)]
489 for s in self.all_streams():
490 for f in s.all_files():
492 def manifest_text(self):
494 return self._manifest_text
496 class CollectionWriter:
497 KEEP_BLOCK_SIZE = 2**26
499 self._data_buffer = []
500 self._data_buffer_len = 0
501 self._current_stream_files = []
502 self._current_stream_length = 0
503 self._current_stream_locators = []
504 self._current_stream_name = '.'
505 self._current_file_name = None
506 self._current_file_pos = 0
507 self._finished_streams = []
512 def write(self, newdata):
513 self._data_buffer += [newdata]
514 self._data_buffer_len += len(newdata)
515 self._current_stream_length += len(newdata)
516 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
518 def flush_data(self):
519 data_buffer = ''.join(self._data_buffer)
520 if data_buffer != '':
521 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
522 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
523 def start_new_file(self, newfilename=None):
524 self.finish_current_file()
525 self.set_current_file_name(newfilename)
526 def set_current_file_name(self, newfilename):
527 if re.search(r'[ \t\n]', newfilename):
528 raise AssertionError("Manifest filenames cannot contain whitespace")
529 self._current_file_name = newfilename
530 def current_file_name(self):
531 return self._current_file_name
532 def finish_current_file(self):
533 if self._current_file_name == None:
534 if self._current_file_pos == self._current_stream_length:
536 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
537 self._current_stream_files += [[self._current_file_pos,
538 self._current_stream_length - self._current_file_pos,
539 self._current_file_name]]
540 self._current_file_pos = self._current_stream_length
541 def start_new_stream(self, newstreamname=None):
542 self.finish_current_stream()
543 self.set_current_stream_name(newstreamname)
544 def set_current_stream_name(self, newstreamname):
545 if re.search(r'[ \t\n]', newstreamname):
546 raise AssertionError("Manifest stream names cannot contain whitespace")
547 self._current_stream_name = newstreamname
548 def current_stream_name(self):
549 return self._current_stream_name
550 def finish_current_stream(self):
551 self.finish_current_file()
553 if len(self._current_stream_files) == 0:
555 elif self._current_stream_name == None:
556 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
558 self._finished_streams += [[self._current_stream_name,
559 self._current_stream_locators,
560 self._current_stream_files]]
561 self._current_stream_files = []
562 self._current_stream_length = 0
563 self._current_stream_locators = []
564 self._current_stream_name = None
565 self._current_file_pos = 0
566 self._current_file_name = None
568 return Keep.put(self.manifest_text())
569 def manifest_text(self):
570 self.finish_current_stream()
572 for stream in self._finished_streams:
573 manifest += stream[0]
574 if len(stream[1]) == 0:
575 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
577 for locator in stream[1]:
578 manifest += " %s" % locator
579 for sfile in stream[2]:
580 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
587 if 'KEEP_LOCAL_STORE' in os.environ:
588 return Keep.local_store_put(data)
589 p = subprocess.Popen(["whput", "-"],
590 stdout=subprocess.PIPE,
591 stdin=subprocess.PIPE,
592 stderr=subprocess.PIPE,
593 shell=False, close_fds=True)
594 stdoutdata, stderrdata = p.communicate(data)
595 if p.returncode != 0:
596 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
597 return stdoutdata.rstrip()
600 if 'KEEP_LOCAL_STORE' in os.environ:
601 return Keep.local_store_get(locator)
602 p = subprocess.Popen(["whget", locator, "-"],
603 stdout=subprocess.PIPE,
605 stderr=subprocess.PIPE,
606 shell=False, close_fds=True)
607 stdoutdata, stderrdata = p.communicate(None)
608 if p.returncode != 0:
609 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
610 m = hashlib.new('md5')
613 if locator.index(m.hexdigest()) == 0:
617 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
619 def local_store_put(data):
620 m = hashlib.new('md5')
623 locator = '%s+%d' % (md5, len(data))
624 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
626 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
627 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
630 def local_store_get(locator):
631 r = re.search('^([0-9a-f]{32,})', locator)
633 raise Exception("Keep.get: invalid data locator '%s'" % locator)
634 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
636 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: