18 from apiclient import errors
19 from apiclient.discovery import build
21 class CredentialsFromEnv:
23 def http_request(self, uri, **kwargs):
24 from httplib import BadStatusLine
25 if 'headers' not in kwargs:
26 kwargs['headers'] = {}
27 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
29 return self.orig_http_request(uri, **kwargs)
31 # This is how httplib tells us that it tried to reuse an
32 # existing connection but it was already closed by the
33 # server. In that case, yes, we would like to retry.
34 # Unfortunately, we are not absolutely certain that the
35 # previous call did not succeed, so this is slightly
37 return self.orig_http_request(uri, **kwargs)
38 def authorize(self, http):
39 http.orig_http_request = http.request
40 http.request = types.MethodType(self.http_request, http)
43 url = ('https://%s/discovery/v1/apis/'
44 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
51 def task_set_output(self,s):
52 service.job_tasks().update(uuid=self['uuid'],
64 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65 t = UserDict.UserDict(t)
66 t.set_output = types.MethodType(task_set_output, t)
67 t.tmpdir = os.environ['TASK_WORK']
76 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77 t = UserDict.UserDict(t)
78 t.tmpdir = os.environ['JOB_WORK']
86 def __init__(self, parameters=dict(), resource_limits=dict()):
87 print "init jobtask %s %s" % (parameters, resource_limits)
91 def one_task_per_input_file(if_sequence=0, and_end_task=True):
92 if if_sequence != current_task()['sequence']:
94 job_input = current_job()['script_parameters']['input']
95 cr = CollectionReader(job_input)
96 for s in cr.all_streams():
97 for f in s.all_files():
98 task_input = f.as_manifest()
100 'job_uuid': current_job()['uuid'],
101 'created_by_job_task_uuid': current_task()['uuid'],
102 'sequence': if_sequence + 1,
107 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
109 service.job_tasks().update(uuid=current_task()['uuid'],
110 job_task=json.dumps({'success':True})
115 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
116 if if_sequence != current_task()['sequence']:
118 job_input = current_job()['script_parameters']['input']
119 cr = CollectionReader(job_input)
120 for s in cr.all_streams():
121 task_input = s.tokens()
123 'job_uuid': current_job()['uuid'],
124 'created_by_job_task_uuid': current_task()['uuid'],
125 'sequence': if_sequence + 1,
130 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
132 service.job_tasks().update(uuid=current_task()['uuid'],
133 job_task=json.dumps({'success':True})
139 def run_command(execargs, **kwargs):
140 if 'stdin' not in kwargs:
141 kwargs['stdin'] = subprocess.PIPE
142 if 'stdout' not in kwargs:
143 kwargs['stdout'] = subprocess.PIPE
144 if 'stderr' not in kwargs:
145 kwargs['stderr'] = subprocess.PIPE
146 p = subprocess.Popen(execargs, close_fds=True, shell=False,
148 stdoutdata, stderrdata = p.communicate(None)
149 if p.returncode != 0:
150 raise Exception("run_command %s exit %d:\n%s" %
151 (execargs, p.returncode, stderrdata))
152 return stdoutdata, stderrdata
155 def git_checkout(url, version, path):
156 if not re.search('^/', path):
157 path = os.path.join(current_job().tmpdir, path)
158 if not os.path.exists(path):
159 util.run_command(["git", "clone", url, path],
160 cwd=os.path.dirname(path))
161 util.run_command(["git", "checkout", version],
166 def tar_extractor(path, decompress_flag):
167 return subprocess.Popen(["tar",
169 ("-x%sf" % decompress_flag),
172 stdin=subprocess.PIPE, stderr=sys.stderr,
173 shell=False, close_fds=True)
176 def tarball_extract(tarball, path):
177 """Retrieve a tarball from Keep and extract it to a local
178 directory. Return the absolute path where the tarball was
179 extracted. If the top level of the tarball contained just one
180 file or directory, return the absolute path of that single
183 tarball -- collection locator
184 path -- where to extract the tarball: absolute, or relative to job tmp
186 if not re.search('^/', path):
187 path = os.path.join(current_job().tmpdir, path)
188 lockfile = open(path + '.lock', 'w')
189 fcntl.flock(lockfile, fcntl.LOCK_EX)
194 already_have_it = False
196 if os.readlink(os.path.join(path, '.locator')) == tarball:
197 already_have_it = True
200 if not already_have_it:
202 # emulate "rm -f" (i.e., if the file does not exist, we win)
204 os.unlink(os.path.join(path, '.locator'))
206 if os.path.exists(os.path.join(path, '.locator')):
207 os.unlink(os.path.join(path, '.locator'))
209 for f in CollectionReader(tarball).all_files():
210 if re.search('\.(tbz|tar.bz2)$', f.name()):
211 p = util.tar_extractor(path, 'j')
212 elif re.search('\.(tgz|tar.gz)$', f.name()):
213 p = util.tar_extractor(path, 'z')
214 elif re.search('\.tar$', f.name()):
215 p = util.tar_extractor(path, '')
217 raise Exception("tarball_extract cannot handle filename %s"
226 if p.returncode != 0:
228 raise Exception("tar exited %d" % p.returncode)
229 os.symlink(tarball, os.path.join(path, '.locator'))
230 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
232 if len(tld_extracts) == 1:
233 return os.path.join(path, tld_extracts[0])
237 def zipball_extract(zipball, path):
238 """Retrieve a zip archive from Keep and extract it to a local
239 directory. Return the absolute path where the archive was
240 extracted. If the top level of the archive contained just one
241 file or directory, return the absolute path of that single
244 zipball -- collection locator
245 path -- where to extract the archive: absolute, or relative to job tmp
247 if not re.search('^/', path):
248 path = os.path.join(current_job().tmpdir, path)
249 lockfile = open(path + '.lock', 'w')
250 fcntl.flock(lockfile, fcntl.LOCK_EX)
255 already_have_it = False
257 if os.readlink(os.path.join(path, '.locator')) == zipball:
258 already_have_it = True
261 if not already_have_it:
263 # emulate "rm -f" (i.e., if the file does not exist, we win)
265 os.unlink(os.path.join(path, '.locator'))
267 if os.path.exists(os.path.join(path, '.locator')):
268 os.unlink(os.path.join(path, '.locator'))
270 for f in CollectionReader(zipball).all_files():
271 if not re.search('\.zip$', f.name()):
272 raise Exception("zipball_extract cannot handle filename %s"
274 zip_filename = os.path.join(path, os.path.basename(f.name()))
275 zip_file = open(zip_filename, 'wb')
283 p = subprocess.Popen(["unzip",
288 stdin=None, stderr=sys.stderr,
289 shell=False, close_fds=True)
291 if p.returncode != 0:
293 raise Exception("unzip exited %d" % p.returncode)
294 os.unlink(zip_filename)
295 os.symlink(zipball, os.path.join(path, '.locator'))
296 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
298 if len(tld_extracts) == 1:
299 return os.path.join(path, tld_extracts[0])
303 def collection_extract(collection, path, files=[], decompress=True):
304 """Retrieve a collection from Keep and extract it to a local
305 directory. Return the absolute path where the collection was
308 collection -- collection locator
309 path -- where to extract: absolute, or relative to job tmp
311 if not re.search('^/', path):
312 path = os.path.join(current_job().tmpdir, path)
313 lockfile = open(path + '.lock', 'w')
314 fcntl.flock(lockfile, fcntl.LOCK_EX)
319 already_have_it = False
321 if os.readlink(os.path.join(path, '.locator')) == collection:
322 already_have_it = True
326 # emulate "rm -f" (i.e., if the file does not exist, we win)
328 os.unlink(os.path.join(path, '.locator'))
330 if os.path.exists(os.path.join(path, '.locator')):
331 os.unlink(os.path.join(path, '.locator'))
334 for f in CollectionReader(collection).all_files():
336 ((f.name() not in files_got) and
337 (f.name() in files or
338 (decompress and f.decompressed_name() in files)))):
339 outname = f.decompressed_name() if decompress else f.name()
340 files_got += [outname]
341 if os.path.exists(os.path.join(path, outname)):
343 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
344 outfile = open(os.path.join(path, outname), 'wb')
345 for buf in (f.readall_decompressed() if decompress
349 if len(files_got) < len(files):
350 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
351 os.symlink(collection, os.path.join(path, '.locator'))
357 def mkdir_dash_p(path):
358 if not os.path.exists(path):
359 util.mkdir_dash_p(os.path.dirname(path))
363 if not os.path.exists(path):
367 def stream_extract(stream, path, files=[], decompress=True):
368 """Retrieve a stream from Keep and extract it to a local
369 directory. Return the absolute path where the stream was
372 stream -- StreamReader object
373 path -- where to extract: absolute, or relative to job tmp
375 if not re.search('^/', path):
376 path = os.path.join(current_job().tmpdir, path)
377 lockfile = open(path + '.lock', 'w')
378 fcntl.flock(lockfile, fcntl.LOCK_EX)
385 for f in stream.all_files():
387 ((f.name() not in files_got) and
388 (f.name() in files or
389 (decompress and f.decompressed_name() in files)))):
390 outname = f.decompressed_name() if decompress else f.name()
391 files_got += [outname]
392 if os.path.exists(os.path.join(path, outname)):
393 os.unlink(os.path.join(path, outname))
394 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
395 outfile = open(os.path.join(path, outname), 'wb')
396 for buf in (f.readall_decompressed() if decompress
400 if len(files_got) < len(files):
401 raise Exception("Wanted files %s but only got %s from %s" %
402 (files, files_got, map(lambda z: z.name(),
403 list(stream.all_files()))))
408 def listdir_recursive(dirname, base=None):
410 for ent in sorted(os.listdir(dirname)):
411 ent_path = os.path.join(dirname, ent)
412 ent_base = os.path.join(base, ent) if base else ent
413 if os.path.isdir(ent_path):
414 allfiles += util.listdir_recursive(ent_path, ent_base)
416 allfiles += [ent_base]
420 def __init__(self, data_locator):
421 self.data_locator = data_locator
422 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
423 stdout=subprocess.PIPE,
424 stdin=None, stderr=subprocess.PIPE,
425 shell=False, close_fds=True)
430 def read(self, size, **kwargs):
431 return self.p.stdout.read(size, **kwargs)
433 self.p.stdout.close()
434 if not self.p.stderr.closed:
435 for err in self.p.stderr:
436 print >> sys.stderr, err
437 self.p.stderr.close()
439 if self.p.returncode != 0:
440 raise Exception("whget subprocess exited %d" % self.p.returncode)
442 class StreamFileReader:
443 def __init__(self, stream, pos, size, name):
444 self._stream = stream
451 def decompressed_name(self):
452 return re.sub('\.(bz2|gz)$', '', self._name)
455 def stream_name(self):
456 return self._stream.name()
457 def read(self, size, **kwargs):
458 self._stream.seek(self._pos + self._filepos)
459 data = self._stream.read(min(size, self._size - self._filepos))
460 self._filepos += len(data)
462 def readall(self, size=2**20, **kwargs):
464 data = self.read(size, **kwargs)
468 def bunzip2(self, size):
469 decompressor = bz2.BZ2Decompressor()
470 for chunk in self.readall(size):
471 data = decompressor.decompress(chunk)
472 if data and data != '':
474 def gunzip(self, size):
475 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
476 for chunk in self.readall(size):
477 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
478 if data and data != '':
480 def readall_decompressed(self, size=2**20):
481 self._stream.seek(self._pos + self._filepos)
482 if re.search('\.bz2$', self._name):
483 return self.bunzip2(size)
484 elif re.search('\.gz$', self._name):
485 return self.gunzip(size)
487 return self.readall(size)
488 def readlines(self, decompress=True):
490 datasource = self.readall_decompressed()
492 self._stream.seek(self._pos + self._filepos)
493 datasource = self.readall()
495 for newdata in datasource:
499 eol = string.find(data, "\n", sol)
502 yield data[sol:eol+1]
507 def as_manifest(self):
509 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
510 % (self._stream.name(), self.name()))
511 return string.join(self._stream.tokens_for_range(self._pos, self._size),
515 def __init__(self, tokens):
516 self._tokens = tokens
517 self._current_datablock_data = None
518 self._current_datablock_pos = 0
519 self._current_datablock_index = -1
522 self._stream_name = None
523 self.data_locators = []
526 for tok in self._tokens:
527 if self._stream_name == None:
528 self._stream_name = tok
529 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
530 self.data_locators += [tok]
531 elif re.search(r'^\d+:\d+:\S+', tok):
532 pos, size, name = tok.split(':',2)
533 self.files += [[int(pos), int(size), name]]
535 raise Exception("Invalid manifest format")
539 def tokens_for_range(self, range_start, range_size):
540 resp = [self._stream_name]
541 return_all_tokens = False
543 token_bytes_skipped = 0
544 for locator in self.data_locators:
545 sizehint = re.search(r'\+(\d+)', locator)
547 return_all_tokens = True
548 if return_all_tokens:
551 blocksize = int(sizehint.group(0))
552 if range_start + range_size <= block_start:
554 if range_start < block_start + blocksize:
557 token_bytes_skipped += blocksize
558 block_start += blocksize
560 if ((f[0] < range_start + range_size)
562 (f[0] + f[1] > range_start)
565 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
568 return self._stream_name
572 yield StreamFileReader(self, pos, size, name)
573 def nextdatablock(self):
574 if self._current_datablock_index < 0:
575 self._current_datablock_pos = 0
576 self._current_datablock_index = 0
578 self._current_datablock_pos += self.current_datablock_size()
579 self._current_datablock_index += 1
580 self._current_datablock_data = None
581 def current_datablock_data(self):
582 if self._current_datablock_data == None:
583 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
584 return self._current_datablock_data
585 def current_datablock_size(self):
586 if self._current_datablock_index < 0:
588 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
590 return int(sizehint.group(0))
591 return len(self.current_datablock_data())
593 """Set the position of the next read operation."""
595 def really_seek(self):
596 """Find and load the appropriate data block, so the byte at
599 if self._pos == self._current_datablock_pos:
601 if (self._current_datablock_pos != None and
602 self._pos >= self._current_datablock_pos and
603 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
605 if self._pos < self._current_datablock_pos:
606 self._current_datablock_index = -1
608 while (self._pos > self._current_datablock_pos and
609 self._pos > self._current_datablock_pos + self.current_datablock_size()):
611 def read(self, size):
612 """Read no more than size bytes -- but at least one byte,
613 unless _pos is already at the end of the stream.
618 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
620 if self._current_datablock_index >= len(self.data_locators):
622 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
623 self._pos += len(data)
626 class CollectionReader:
627 def __init__(self, manifest_locator_or_text):
628 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
629 self._manifest_text = manifest_locator_or_text
630 self._manifest_locator = None
632 self._manifest_locator = manifest_locator_or_text
633 self._manifest_text = None
640 if self._streams != None:
642 if not self._manifest_text:
643 self._manifest_text = Keep.get(self._manifest_locator)
645 for stream_line in self._manifest_text.split("\n"):
646 if stream_line != '':
647 stream_tokens = stream_line.split()
648 self._streams += [stream_tokens]
649 def all_streams(self):
652 for s in self._streams:
653 resp += [StreamReader(s)]
656 for s in self.all_streams():
657 for f in s.all_files():
659 def manifest_text(self):
661 return self._manifest_text
663 class CollectionWriter:
664 KEEP_BLOCK_SIZE = 2**26
666 self._data_buffer = []
667 self._data_buffer_len = 0
668 self._current_stream_files = []
669 self._current_stream_length = 0
670 self._current_stream_locators = []
671 self._current_stream_name = '.'
672 self._current_file_name = None
673 self._current_file_pos = 0
674 self._finished_streams = []
679 def write(self, newdata):
680 self._data_buffer += [newdata]
681 self._data_buffer_len += len(newdata)
682 self._current_stream_length += len(newdata)
683 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
685 def flush_data(self):
686 data_buffer = ''.join(self._data_buffer)
687 if data_buffer != '':
688 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
689 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
690 self._data_buffer_len = len(self._data_buffer[0])
691 def start_new_file(self, newfilename=None):
692 self.finish_current_file()
693 self.set_current_file_name(newfilename)
694 def set_current_file_name(self, newfilename):
695 if re.search(r'[ \t\n]', newfilename):
696 raise AssertionError("Manifest filenames cannot contain whitespace")
697 self._current_file_name = newfilename
698 def current_file_name(self):
699 return self._current_file_name
700 def finish_current_file(self):
701 if self._current_file_name == None:
702 if self._current_file_pos == self._current_stream_length:
704 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
705 self._current_stream_files += [[self._current_file_pos,
706 self._current_stream_length - self._current_file_pos,
707 self._current_file_name]]
708 self._current_file_pos = self._current_stream_length
709 def start_new_stream(self, newstreamname='.'):
710 self.finish_current_stream()
711 self.set_current_stream_name(newstreamname)
712 def set_current_stream_name(self, newstreamname):
713 if re.search(r'[ \t\n]', newstreamname):
714 raise AssertionError("Manifest stream names cannot contain whitespace")
715 self._current_stream_name = newstreamname
716 def current_stream_name(self):
717 return self._current_stream_name
718 def finish_current_stream(self):
719 self.finish_current_file()
721 if len(self._current_stream_files) == 0:
723 elif self._current_stream_name == None:
724 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
726 self._finished_streams += [[self._current_stream_name,
727 self._current_stream_locators,
728 self._current_stream_files]]
729 self._current_stream_files = []
730 self._current_stream_length = 0
731 self._current_stream_locators = []
732 self._current_stream_name = None
733 self._current_file_pos = 0
734 self._current_file_name = None
736 return Keep.put(self.manifest_text())
737 def manifest_text(self):
738 self.finish_current_stream()
740 for stream in self._finished_streams:
741 if not re.search(r'^\.(/.*)?$', stream[0]):
743 manifest += stream[0]
744 if len(stream[1]) == 0:
745 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
747 for locator in stream[1]:
748 manifest += " %s" % locator
749 for sfile in stream[2]:
750 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
757 if 'KEEP_LOCAL_STORE' in os.environ:
758 return Keep.local_store_put(data)
759 p = subprocess.Popen(["whput", "-"],
760 stdout=subprocess.PIPE,
761 stdin=subprocess.PIPE,
762 stderr=subprocess.PIPE,
763 shell=False, close_fds=True)
764 stdoutdata, stderrdata = p.communicate(data)
765 if p.returncode != 0:
766 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
767 return stdoutdata.rstrip()
770 if 'KEEP_LOCAL_STORE' in os.environ:
771 return Keep.local_store_get(locator)
772 p = subprocess.Popen(["whget", locator, "-"],
773 stdout=subprocess.PIPE,
775 stderr=subprocess.PIPE,
776 shell=False, close_fds=True)
777 stdoutdata, stderrdata = p.communicate(None)
778 if p.returncode != 0:
779 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
780 m = hashlib.new('md5')
783 if locator.index(m.hexdigest()) == 0:
787 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
789 def local_store_put(data):
790 m = hashlib.new('md5')
793 locator = '%s+%d' % (md5, len(data))
794 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
796 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
797 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
800 def local_store_get(locator):
801 r = re.search('^([0-9a-f]{32,})', locator)
803 raise Exception("Keep.get: invalid data locator '%s'" % locator)
804 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
806 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: