18 from apiclient import errors
19 from apiclient.discovery import build
21 class CredentialsFromEnv:
23 def http_request(self, uri, **kwargs):
24 from httplib import BadStatusLine
25 if 'headers' not in kwargs:
26 kwargs['headers'] = {}
27 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
29 return self.orig_http_request(uri, **kwargs)
31 # This is how httplib tells us that it tried to reuse an
32 # existing connection but it was already closed by the
33 # server. In that case, yes, we would like to retry.
34 # Unfortunately, we are not absolutely certain that the
35 # previous call did not succeed, so this is slightly
37 return self.orig_http_request(uri, **kwargs)
38 def authorize(self, http):
39 http.orig_http_request = http.request
40 http.request = types.MethodType(self.http_request, http)
43 url = ('https://%s/discovery/v1/apis/'
44 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
51 def task_set_output(self,s):
52 service.job_tasks().update(uuid=self['uuid'],
64 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65 t = UserDict.UserDict(t)
66 t.set_output = types.MethodType(task_set_output, t)
67 t.tmpdir = os.environ['TASK_WORK']
76 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77 t = UserDict.UserDict(t)
78 t.tmpdir = os.environ['JOB_WORK']
86 def __init__(self, parameters=dict(), resource_limits=dict()):
87 print "init jobtask %s %s" % (parameters, resource_limits)
91 def one_task_per_input_file(if_sequence=0, and_end_task=True):
92 if if_sequence != current_task()['sequence']:
94 job_input = current_job()['script_parameters']['input']
95 cr = CollectionReader(job_input)
96 for s in cr.all_streams():
97 for f in s.all_files():
98 task_input = f.as_manifest()
100 'job_uuid': current_job()['uuid'],
101 'created_by_job_task_uuid': current_task()['uuid'],
102 'sequence': if_sequence + 1,
107 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
109 service.job_tasks().update(uuid=current_task()['uuid'],
110 job_task=json.dumps({'success':True})
115 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
116 if if_sequence != current_task()['sequence']:
118 job_input = current_job()['script_parameters']['input']
119 cr = CollectionReader(job_input)
120 for s in cr.all_streams():
121 task_input = s.tokens()
123 'job_uuid': current_job()['uuid'],
124 'created_by_job_task_uuid': current_task()['uuid'],
125 'sequence': if_sequence + 1,
130 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
132 service.job_tasks().update(uuid=current_task()['uuid'],
133 job_task=json.dumps({'success':True})
139 def run_command(execargs, **kwargs):
140 if 'stdin' not in kwargs:
141 kwargs['stdin'] = subprocess.PIPE
142 if 'stdout' not in kwargs:
143 kwargs['stdout'] = subprocess.PIPE
144 if 'stderr' not in kwargs:
145 kwargs['stderr'] = subprocess.PIPE
146 p = subprocess.Popen(execargs, close_fds=True, shell=False,
148 stdoutdata, stderrdata = p.communicate(None)
149 if p.returncode != 0:
150 raise Exception("run_command %s exit %d:\n%s" %
151 (execargs, p.returncode, stderrdata))
152 return stdoutdata, stderrdata
155 def git_checkout(url, version, path):
156 if not re.search('^/', path):
157 path = os.path.join(current_job().tmpdir, path)
158 if not os.path.exists(path):
159 util.run_command(["git", "clone", url, path],
160 cwd=os.path.dirname(path))
161 util.run_command(["git", "checkout", version],
166 def tar_extractor(path, decompress_flag):
167 return subprocess.Popen(["tar",
169 ("-x%sf" % decompress_flag),
172 stdin=subprocess.PIPE, stderr=sys.stderr,
173 shell=False, close_fds=True)
176 def tarball_extract(tarball, path):
177 """Retrieve a tarball from Keep and extract it to a local
178 directory. Return the absolute path where the tarball was
179 extracted. If the top level of the tarball contained just one
180 file or directory, return the absolute path of that single
183 tarball -- collection locator
184 path -- where to extract the tarball: absolute, or relative to job tmp
186 if not re.search('^/', path):
187 path = os.path.join(current_job().tmpdir, path)
188 lockfile = open(path + '.lock', 'w')
189 fcntl.flock(lockfile, fcntl.LOCK_EX)
194 already_have_it = False
196 if os.readlink(os.path.join(path, '.locator')) == tarball:
197 already_have_it = True
200 if not already_have_it:
202 # emulate "rm -f" (i.e., if the file does not exist, we win)
204 os.unlink(os.path.join(path, '.locator'))
206 if os.path.exists(os.path.join(path, '.locator')):
207 os.unlink(os.path.join(path, '.locator'))
209 for f in CollectionReader(tarball).all_files():
210 if re.search('\.(tbz|tar.bz2)$', f.name()):
211 p = util.tar_extractor(path, 'j')
212 elif re.search('\.(tgz|tar.gz)$', f.name()):
213 p = util.tar_extractor(path, 'z')
214 elif re.search('\.tar$', f.name()):
215 p = util.tar_extractor(path, '')
217 raise Exception("tarball_extract cannot handle filename %s"
226 if p.returncode != 0:
228 raise Exception("tar exited %d" % p.returncode)
229 os.symlink(tarball, os.path.join(path, '.locator'))
230 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
232 if len(tld_extracts) == 1:
233 return os.path.join(path, tld_extracts[0])
237 def zipball_extract(zipball, path):
238 """Retrieve a zip archive from Keep and extract it to a local
239 directory. Return the absolute path where the archive was
240 extracted. If the top level of the archive contained just one
241 file or directory, return the absolute path of that single
244 zipball -- collection locator
245 path -- where to extract the archive: absolute, or relative to job tmp
247 if not re.search('^/', path):
248 path = os.path.join(current_job().tmpdir, path)
249 lockfile = open(path + '.lock', 'w')
250 fcntl.flock(lockfile, fcntl.LOCK_EX)
255 already_have_it = False
257 if os.readlink(os.path.join(path, '.locator')) == zipball:
258 already_have_it = True
261 if not already_have_it:
263 # emulate "rm -f" (i.e., if the file does not exist, we win)
265 os.unlink(os.path.join(path, '.locator'))
267 if os.path.exists(os.path.join(path, '.locator')):
268 os.unlink(os.path.join(path, '.locator'))
270 for f in CollectionReader(zipball).all_files():
271 if not re.search('\.zip$', f.name()):
272 raise Exception("zipball_extract cannot handle filename %s"
274 zip_filename = os.path.join(path, os.path.basename(f.name()))
275 zip_file = open(zip_filename, 'wb')
283 p = subprocess.Popen(["unzip",
288 stdin=None, stderr=sys.stderr,
289 shell=False, close_fds=True)
291 if p.returncode != 0:
293 raise Exception("unzip exited %d" % p.returncode)
294 os.unlink(zip_filename)
295 os.symlink(zipball, os.path.join(path, '.locator'))
296 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
298 if len(tld_extracts) == 1:
299 return os.path.join(path, tld_extracts[0])
303 def collection_extract(collection, path, files=[], decompress=True):
304 """Retrieve a collection from Keep and extract it to a local
305 directory. Return the absolute path where the collection was
308 collection -- collection locator
309 path -- where to extract: absolute, or relative to job tmp
311 if not re.search('^/', path):
312 path = os.path.join(current_job().tmpdir, path)
313 lockfile = open(path + '.lock', 'w')
314 fcntl.flock(lockfile, fcntl.LOCK_EX)
319 already_have_it = False
321 if os.readlink(os.path.join(path, '.locator')) == collection:
322 already_have_it = True
326 # emulate "rm -f" (i.e., if the file does not exist, we win)
328 os.unlink(os.path.join(path, '.locator'))
330 if os.path.exists(os.path.join(path, '.locator')):
331 os.unlink(os.path.join(path, '.locator'))
334 for f in CollectionReader(collection).all_files():
336 ((f.name() not in files_got) and
337 (f.name() in files or
338 (decompress and f.decompressed_name() in files)))):
339 outname = f.decompressed_name() if decompress else f.name()
340 files_got += [outname]
341 if os.path.exists(os.path.join(path, outname)):
343 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
344 outfile = open(os.path.join(path, outname), 'wb')
345 for buf in (f.readall_decompressed() if decompress
349 if len(files_got) < len(files):
350 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
351 os.symlink(collection, os.path.join(path, '.locator'))
357 def mkdir_dash_p(path):
358 if not os.path.exists(path):
359 util.mkdir_dash_p(os.path.dirname(path))
363 if not os.path.exists(path):
367 def stream_extract(stream, path, files=[], decompress=True):
368 """Retrieve a stream from Keep and extract it to a local
369 directory. Return the absolute path where the stream was
372 stream -- StreamReader object
373 path -- where to extract: absolute, or relative to job tmp
375 if not re.search('^/', path):
376 path = os.path.join(current_job().tmpdir, path)
377 lockfile = open(path + '.lock', 'w')
378 fcntl.flock(lockfile, fcntl.LOCK_EX)
385 for f in stream.all_files():
387 ((f.name() not in files_got) and
388 (f.name() in files or
389 (decompress and f.decompressed_name() in files)))):
390 outname = f.decompressed_name() if decompress else f.name()
391 files_got += [outname]
392 if os.path.exists(os.path.join(path, outname)):
394 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
395 outfile = open(os.path.join(path, outname), 'wb')
396 for buf in (f.readall_decompressed() if decompress
400 if len(files_got) < len(files):
401 raise Exception("Wanted files %s but only got %s from %s" %
402 (files, files_got, map(lambda z: z.name(),
403 list(stream.all_files()))))
408 def listdir_recursive(dirname, base=None):
410 for ent in sorted(os.listdir(dirname)):
411 ent_path = os.path.join(dirname, ent)
412 ent_base = os.path.join(base, ent) if base else ent
413 if os.path.isdir(ent_path):
414 allfiles += util.listdir_recursive(ent_path, ent_base)
416 allfiles += [ent_base]
420 def __init__(self, data_locator):
421 self.data_locator = data_locator
422 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
423 stdout=subprocess.PIPE,
424 stdin=None, stderr=subprocess.PIPE,
425 shell=False, close_fds=True)
430 def read(self, size, **kwargs):
431 return self.p.stdout.read(size, **kwargs)
433 self.p.stdout.close()
434 if not self.p.stderr.closed:
435 for err in self.p.stderr:
436 print >> sys.stderr, err
437 self.p.stderr.close()
439 if self.p.returncode != 0:
440 raise Exception("whget subprocess exited %d" % self.p.returncode)
442 class StreamFileReader:
443 def __init__(self, stream, pos, size, name):
444 self._stream = stream
451 def decompressed_name(self):
452 return re.sub('\.(bz2|gz)$', '', self._name)
455 def stream_name(self):
456 return self._stream.name()
457 def read(self, size, **kwargs):
458 self._stream.seek(self._pos + self._filepos)
459 data = self._stream.read(min(size, self._size - self._filepos))
460 self._filepos += len(data)
462 def readall(self, size=2**20, **kwargs):
464 data = self.read(size, **kwargs)
468 def bunzip2(self, size):
469 decompressor = bz2.BZ2Decompressor()
470 for chunk in self.readall(size):
471 data = decompressor.decompress(chunk)
472 if data and data != '':
474 def gunzip(self, size):
475 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
476 for chunk in self.readall(size):
477 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
478 if data and data != '':
480 def readall_decompressed(self, size=2**20):
481 self._stream.seek(self._pos + self._filepos)
482 if re.search('\.bz2$', self._name):
483 return self.bunzip2(size)
484 elif re.search('\.gz$', self._name):
485 return self.gunzip(size)
487 return self.readall(size)
488 def readlines(self, decompress=True):
490 datasource = self.readall_decompressed()
492 self._stream.seek(self._pos + self._filepos)
493 datasource = self.readall()
495 for newdata in datasource:
499 eol = string.find(data, "\n", sol)
502 yield data[sol:eol+1]
507 def as_manifest(self):
509 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
510 % (self._stream.name(), self.name()))
511 return string.join(self._stream.tokens_for_range(self._pos, self._size),
515 def __init__(self, tokens):
516 self._tokens = tokens
517 self._current_datablock_data = None
518 self._current_datablock_pos = 0
519 self._current_datablock_index = -1
522 self._stream_name = None
523 self.data_locators = []
526 for tok in self._tokens:
527 if self._stream_name == None:
528 self._stream_name = tok
529 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
530 self.data_locators += [tok]
531 elif re.search(r'^\d+:\d+:\S+', tok):
532 pos, size, name = tok.split(':',2)
533 self.files += [[int(pos), int(size), name]]
535 raise Exception("Invalid manifest format")
539 def tokens_for_range(self, range_start, range_size):
540 resp = [self._stream_name]
541 return_all_tokens = False
543 token_bytes_skipped = 0
544 for locator in self.data_locators:
545 sizehint = re.search(r'\+(\d+)', locator)
547 return_all_tokens = True
548 if return_all_tokens:
551 blocksize = int(sizehint.group(0))
552 if range_start + range_size <= block_start:
554 if range_start < block_start + blocksize:
557 token_bytes_skipped += blocksize
558 block_start += blocksize
560 if ((f[0] < range_start + range_size)
562 (f[0] + f[1] > range_start)
565 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
568 return self._stream_name
572 yield StreamFileReader(self, pos, size, name)
573 def nextdatablock(self):
574 if self._current_datablock_index < 0:
575 self._current_datablock_pos = 0
576 self._current_datablock_index = 0
578 self._current_datablock_pos += self.current_datablock_size()
579 self._current_datablock_index += 1
580 self._current_datablock_data = None
581 def current_datablock_data(self):
582 if self._current_datablock_data == None:
583 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
584 return self._current_datablock_data
585 def current_datablock_size(self):
586 if self._current_datablock_index < 0:
588 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
590 return int(sizehint.group(0))
591 return len(self.current_datablock_data())
593 """Set the position of the next read operation."""
595 def really_seek(self):
596 """Find and load the appropriate data block, so the byte at
599 if self._pos == self._current_datablock_pos:
601 if (self._current_datablock_pos != None and
602 self._pos >= self._current_datablock_pos and
603 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
605 if self._pos < self._current_datablock_pos:
606 self._current_datablock_index = -1
608 while (self._pos > self._current_datablock_pos and
609 self._pos > self._current_datablock_pos + self.current_datablock_size()):
611 def read(self, size):
612 """Read no more than size bytes -- but at least one byte,
613 unless _pos is already at the end of the stream.
618 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
620 if self._current_datablock_index >= len(self.data_locators):
622 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
623 self._pos += len(data)
626 class CollectionReader:
627 def __init__(self, manifest_locator_or_text):
628 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
629 self._manifest_text = manifest_locator_or_text
630 self._manifest_locator = None
632 self._manifest_locator = manifest_locator_or_text
633 self._manifest_text = None
640 if self._streams != None:
642 if not self._manifest_text:
643 self._manifest_text = Keep.get(self._manifest_locator)
645 for stream_line in self._manifest_text.split("\n"):
646 stream_tokens = stream_line.split()
647 self._streams += [stream_tokens]
648 def all_streams(self):
651 for s in self._streams:
652 resp += [StreamReader(s)]
655 for s in self.all_streams():
656 for f in s.all_files():
658 def manifest_text(self):
660 return self._manifest_text
662 class CollectionWriter:
663 KEEP_BLOCK_SIZE = 2**26
665 self._data_buffer = []
666 self._data_buffer_len = 0
667 self._current_stream_files = []
668 self._current_stream_length = 0
669 self._current_stream_locators = []
670 self._current_stream_name = '.'
671 self._current_file_name = None
672 self._current_file_pos = 0
673 self._finished_streams = []
678 def write(self, newdata):
679 self._data_buffer += [newdata]
680 self._data_buffer_len += len(newdata)
681 self._current_stream_length += len(newdata)
682 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
684 def flush_data(self):
685 data_buffer = ''.join(self._data_buffer)
686 if data_buffer != '':
687 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
688 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
689 self._data_buffer_len = len(self._data_buffer[0])
690 def start_new_file(self, newfilename=None):
691 self.finish_current_file()
692 self.set_current_file_name(newfilename)
693 def set_current_file_name(self, newfilename):
694 if re.search(r'[ \t\n]', newfilename):
695 raise AssertionError("Manifest filenames cannot contain whitespace")
696 self._current_file_name = newfilename
697 def current_file_name(self):
698 return self._current_file_name
699 def finish_current_file(self):
700 if self._current_file_name == None:
701 if self._current_file_pos == self._current_stream_length:
703 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
704 self._current_stream_files += [[self._current_file_pos,
705 self._current_stream_length - self._current_file_pos,
706 self._current_file_name]]
707 self._current_file_pos = self._current_stream_length
708 def start_new_stream(self, newstreamname='.'):
709 self.finish_current_stream()
710 self.set_current_stream_name(newstreamname)
711 def set_current_stream_name(self, newstreamname):
712 if re.search(r'[ \t\n]', newstreamname):
713 raise AssertionError("Manifest stream names cannot contain whitespace")
714 self._current_stream_name = newstreamname
715 def current_stream_name(self):
716 return self._current_stream_name
717 def finish_current_stream(self):
718 self.finish_current_file()
720 if len(self._current_stream_files) == 0:
722 elif self._current_stream_name == None:
723 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
725 self._finished_streams += [[self._current_stream_name,
726 self._current_stream_locators,
727 self._current_stream_files]]
728 self._current_stream_files = []
729 self._current_stream_length = 0
730 self._current_stream_locators = []
731 self._current_stream_name = None
732 self._current_file_pos = 0
733 self._current_file_name = None
735 return Keep.put(self.manifest_text())
736 def manifest_text(self):
737 self.finish_current_stream()
739 for stream in self._finished_streams:
740 if not re.search(r'^\.(/.*)?$', stream[0]):
742 manifest += stream[0]
743 if len(stream[1]) == 0:
744 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
746 for locator in stream[1]:
747 manifest += " %s" % locator
748 for sfile in stream[2]:
749 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
756 if 'KEEP_LOCAL_STORE' in os.environ:
757 return Keep.local_store_put(data)
758 p = subprocess.Popen(["whput", "-"],
759 stdout=subprocess.PIPE,
760 stdin=subprocess.PIPE,
761 stderr=subprocess.PIPE,
762 shell=False, close_fds=True)
763 stdoutdata, stderrdata = p.communicate(data)
764 if p.returncode != 0:
765 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
766 return stdoutdata.rstrip()
769 if 'KEEP_LOCAL_STORE' in os.environ:
770 return Keep.local_store_get(locator)
771 p = subprocess.Popen(["whget", locator, "-"],
772 stdout=subprocess.PIPE,
774 stderr=subprocess.PIPE,
775 shell=False, close_fds=True)
776 stdoutdata, stderrdata = p.communicate(None)
777 if p.returncode != 0:
778 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
779 m = hashlib.new('md5')
782 if locator.index(m.hexdigest()) == 0:
786 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
788 def local_store_put(data):
789 m = hashlib.new('md5')
792 locator = '%s+%d' % (md5, len(data))
793 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
795 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
796 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
799 def local_store_get(locator):
800 r = re.search('^([0-9a-f]{32,})', locator)
802 raise Exception("Keep.get: invalid data locator '%s'" % locator)
803 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
805 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: