18 from apiclient import errors
19 from apiclient.discovery import build
21 class CredentialsFromEnv:
23 def http_request(self, uri, **kwargs):
24 from httplib import BadStatusLine
25 if 'headers' not in kwargs:
26 kwargs['headers'] = {}
27 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
29 return self.orig_http_request(uri, **kwargs)
31 # This is how httplib tells us that it tried to reuse an
32 # existing connection but it was already closed by the
33 # server. In that case, yes, we would like to retry.
34 # Unfortunately, we are not absolutely certain that the
35 # previous call did not succeed, so this is slightly
37 return self.orig_http_request(uri, **kwargs)
38 def authorize(self, http):
39 http.orig_http_request = http.request
40 http.request = types.MethodType(self.http_request, http)
43 url = ('https://%s/discovery/v1/apis/'
44 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
51 def task_set_output(self,s):
52 service.job_tasks().update(uuid=self['uuid'],
64 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65 t = UserDict.UserDict(t)
66 t.set_output = types.MethodType(task_set_output, t)
67 t.tmpdir = os.environ['TASK_WORK']
76 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77 t = UserDict.UserDict(t)
78 t.tmpdir = os.environ['JOB_WORK']
86 def __init__(self, parameters=dict(), resource_limits=dict()):
87 print "init jobtask %s %s" % (parameters, resource_limits)
91 def one_task_per_input_file(if_sequence=0, and_end_task=True):
92 if if_sequence != current_task()['sequence']:
94 job_input = current_job()['script_parameters']['input']
95 cr = CollectionReader(job_input)
96 for s in cr.all_streams():
97 for f in s.all_files():
98 task_input = f.as_manifest()
100 'job_uuid': current_job()['uuid'],
101 'created_by_job_task_uuid': current_task()['uuid'],
102 'sequence': if_sequence + 1,
107 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
109 service.job_tasks().update(uuid=current_task()['uuid'],
110 job_task=json.dumps({'success':True})
116 def run_command(execargs, **kwargs):
117 if 'stdin' not in kwargs:
118 kwargs['stdin'] = subprocess.PIPE
119 if 'stdout' not in kwargs:
120 kwargs['stdout'] = subprocess.PIPE
121 if 'stderr' not in kwargs:
122 kwargs['stderr'] = subprocess.PIPE
123 p = subprocess.Popen(execargs, close_fds=True, shell=False,
125 stdoutdata, stderrdata = p.communicate(None)
126 if p.returncode != 0:
127 raise Exception("run_command %s exit %d:\n%s" %
128 (execargs, p.returncode, stderrdata))
129 return stdoutdata, stderrdata
132 def git_checkout(url, version, path):
133 if not re.search('^/', path):
134 path = os.path.join(current_job().tmpdir, path)
135 if not os.path.exists(path):
136 util.run_command(["git", "clone", url, path],
137 cwd=os.path.dirname(path))
138 util.run_command(["git", "checkout", version],
143 def tar_extractor(path, decompress_flag):
144 return subprocess.Popen(["tar",
146 ("-x%sf" % decompress_flag),
149 stdin=subprocess.PIPE, stderr=sys.stderr,
150 shell=False, close_fds=True)
153 def tarball_extract(tarball, path):
154 """Retrieve a tarball from Keep and extract it to a local
155 directory. Return the absolute path where the tarball was
156 extracted. If the top level of the tarball contained just one
157 file or directory, return the absolute path of that single
160 tarball -- collection locator
161 path -- where to extract the tarball: absolute, or relative to job tmp
163 if not re.search('^/', path):
164 path = os.path.join(current_job().tmpdir, path)
165 lockfile = open(path + '.lock', 'w')
166 fcntl.flock(lockfile, fcntl.LOCK_EX)
171 already_have_it = False
173 if os.readlink(os.path.join(path, '.locator')) == tarball:
174 already_have_it = True
177 if not already_have_it:
179 # emulate "rm -f" (i.e., if the file does not exist, we win)
181 os.unlink(os.path.join(path, '.locator'))
183 if os.path.exists(os.path.join(path, '.locator')):
184 os.unlink(os.path.join(path, '.locator'))
186 for f in CollectionReader(tarball).all_files():
187 if re.search('\.(tbz|tar.bz2)$', f.name()):
188 p = tar_extractor(path, 'j')
189 elif re.search('\.(tgz|tar.gz)$', f.name()):
190 p = tar_extractor(path, 'z')
191 elif re.search('\.tar$', f.name()):
192 p = tar_extractor(path, '')
194 raise Exception("tarball_extract cannot handle filename %s"
203 if p.returncode != 0:
205 raise Exception("tar exited %d" % p.returncode)
206 os.symlink(tarball, os.path.join(path, '.locator'))
207 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
209 if len(tld_extracts) == 1:
210 return os.path.join(path, tld_extracts[0])
214 def zipball_extract(zipball, path):
215 """Retrieve a zip archive from Keep and extract it to a local
216 directory. Return the absolute path where the archive was
217 extracted. If the top level of the archive contained just one
218 file or directory, return the absolute path of that single
221 zipball -- collection locator
222 path -- where to extract the archive: absolute, or relative to job tmp
224 if not re.search('^/', path):
225 path = os.path.join(current_job().tmpdir, path)
226 lockfile = open(path + '.lock', 'w')
227 fcntl.flock(lockfile, fcntl.LOCK_EX)
232 already_have_it = False
234 if os.readlink(os.path.join(path, '.locator')) == zipball:
235 already_have_it = True
238 if not already_have_it:
240 # emulate "rm -f" (i.e., if the file does not exist, we win)
242 os.unlink(os.path.join(path, '.locator'))
244 if os.path.exists(os.path.join(path, '.locator')):
245 os.unlink(os.path.join(path, '.locator'))
247 for f in CollectionReader(zipball).all_files():
248 if not re.search('\.zip$', f.name()):
249 raise Exception("zipball_extract cannot handle filename %s"
251 zip_filename = os.path.join(path, os.path.basename(f.name()))
252 zip_file = open(zip_filename, 'wb')
260 p = subprocess.Popen(["unzip",
265 stdin=None, stderr=sys.stderr,
266 shell=False, close_fds=True)
268 if p.returncode != 0:
270 raise Exception("unzip exited %d" % p.returncode)
271 os.unlink(zip_filename)
272 os.symlink(zipball, os.path.join(path, '.locator'))
273 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
275 if len(tld_extracts) == 1:
276 return os.path.join(path, tld_extracts[0])
280 def collection_extract(collection, path, files=[], decompress=True):
281 """Retrieve a collection from Keep and extract it to a local
282 directory. Return the absolute path where the collection was
285 collection -- collection locator
286 path -- where to extract: absolute, or relative to job tmp
288 if not re.search('^/', path):
289 path = os.path.join(current_job().tmpdir, path)
290 lockfile = open(path + '.lock', 'w')
291 fcntl.flock(lockfile, fcntl.LOCK_EX)
296 already_have_it = False
298 if os.readlink(os.path.join(path, '.locator')) == collection:
299 already_have_it = True
303 # emulate "rm -f" (i.e., if the file does not exist, we win)
306 os.unlink(os.path.join(path, '.locator'))
308 if os.path.exists(os.path.join(path, '.locator')):
309 os.unlink(os.path.join(path, '.locator'))
311 for f in CollectionReader(collection).all_files():
313 ((f.name() not in files_got) and
314 (f.name() in files or
315 (decompress and f.decompressed_name() in files)))):
316 outname = f.decompressed_name() if decompress else f.name()
317 files_got += [outname]
318 if os.path.exists(os.path.join(path, outname)):
320 outfile = open(os.path.join(path, outname), 'w')
321 for buf in (f.readall_decompressed() if decompress
325 if len(files_got) < len(files):
326 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
327 os.symlink(collection, os.path.join(path, '.locator'))
333 def __init__(self, data_locator):
334 self.data_locator = data_locator
335 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
336 stdout=subprocess.PIPE,
337 stdin=None, stderr=subprocess.PIPE,
338 shell=False, close_fds=True)
343 def read(self, size, **kwargs):
344 return self.p.stdout.read(size, **kwargs)
346 self.p.stdout.close()
347 if not self.p.stderr.closed:
348 for err in self.p.stderr:
349 print >> sys.stderr, err
350 self.p.stderr.close()
352 if self.p.returncode != 0:
353 raise Exception("whget subprocess exited %d" % self.p.returncode)
355 class StreamFileReader:
356 def __init__(self, stream, pos, size, name):
357 self._stream = stream
364 def decompressed_name(self):
365 return re.sub('\.(bz2|gz)$', '', self._name)
368 def stream_name(self):
369 return self._stream.name()
370 def read(self, size, **kwargs):
371 self._stream.seek(self._pos + self._filepos)
372 data = self._stream.read(min(size, self._size - self._filepos))
373 self._filepos += len(data)
375 def readall(self, size=2**20, **kwargs):
377 data = self.read(size, **kwargs)
381 def bunzip2(self, size):
382 decompressor = bz2.BZ2Decompressor()
383 for chunk in self.readall(size):
384 data = decompressor.decompress(chunk)
385 if data and data != '':
387 def gunzip(self, size):
388 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
389 for chunk in self.readall(size):
390 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
391 if data and data != '':
393 def readall_decompressed(self, size=2**20):
394 self._stream.seek(self._pos + self._filepos)
395 if re.search('\.bz2$', self._name):
396 return self.bunzip2(size)
397 elif re.search('\.gz$', self._name):
398 return self.gunzip(size)
400 return self.readall(size)
401 def readlines(self, decompress=True):
403 datasource = self.readall_decompressed()
405 self._stream.seek(self._pos + self._filepos)
406 datasource = self.readall()
408 for newdata in datasource:
412 eol = string.find(data, "\n", sol)
415 yield data[sol:eol+1]
420 def as_manifest(self):
422 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
423 % (self._stream.name(), self.name()))
424 return string.join(self._stream.tokens_for_range(self._pos, self._size),
428 def __init__(self, tokens):
429 self._tokens = tokens
430 self._current_datablock_data = None
431 self._current_datablock_pos = 0
432 self._current_datablock_index = -1
435 self._stream_name = None
436 self.data_locators = []
439 for tok in self._tokens:
440 if self._stream_name == None:
441 self._stream_name = tok
442 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
443 self.data_locators += [tok]
444 elif re.search(r'^\d+:\d+:\S+', tok):
445 pos, size, name = tok.split(':',2)
446 self.files += [[int(pos), int(size), name]]
448 raise Exception("Invalid manifest format")
449 def tokens_for_range(self, range_start, range_size):
450 resp = [self._stream_name]
451 return_all_tokens = False
453 token_bytes_skipped = 0
454 for locator in self.data_locators:
455 sizehint = re.search(r'\+(\d+)', locator)
457 return_all_tokens = True
458 if return_all_tokens:
461 blocksize = int(sizehint.group(0))
462 if range_start + range_size <= block_start:
464 if range_start < block_start + blocksize:
467 token_bytes_skipped += blocksize
468 block_start += blocksize
470 if ((f[0] < range_start + range_size)
472 (f[0] + f[1] > range_start)
475 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
478 return self._stream_name
482 yield StreamFileReader(self, pos, size, name)
483 def nextdatablock(self):
484 if self._current_datablock_index < 0:
485 self._current_datablock_pos = 0
486 self._current_datablock_index = 0
488 self._current_datablock_pos += self.current_datablock_size()
489 self._current_datablock_index += 1
490 self._current_datablock_data = None
491 def current_datablock_data(self):
492 if self._current_datablock_data == None:
493 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
494 return self._current_datablock_data
495 def current_datablock_size(self):
496 if self._current_datablock_index < 0:
498 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
500 return int(sizehint.group(0))
501 return len(self.current_datablock_data())
503 """Set the position of the next read operation."""
505 def really_seek(self):
506 """Find and load the appropriate data block, so the byte at
509 if self._pos == self._current_datablock_pos:
511 if (self._current_datablock_pos != None and
512 self._pos >= self._current_datablock_pos and
513 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
515 if self._pos < self._current_datablock_pos:
516 self._current_datablock_index = -1
518 while (self._pos > self._current_datablock_pos and
519 self._pos > self._current_datablock_pos + self.current_datablock_size()):
521 def read(self, size):
522 """Read no more than size bytes -- but at least one byte,
523 unless _pos is already at the end of the stream.
528 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
530 if self._current_datablock_index >= len(self.data_locators):
532 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
533 self._pos += len(data)
536 class CollectionReader:
537 def __init__(self, manifest_locator_or_text):
538 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
539 self._manifest_text = manifest_locator_or_text
540 self._manifest_locator = None
542 self._manifest_locator = manifest_locator_or_text
543 self._manifest_text = None
550 if self._streams != None:
552 if not self._manifest_text:
553 self._manifest_text = Keep.get(self._manifest_locator)
555 for stream_line in self._manifest_text.split("\n"):
556 stream_tokens = stream_line.split()
557 self._streams += [stream_tokens]
558 def all_streams(self):
561 for s in self._streams:
562 resp += [StreamReader(s)]
565 for s in self.all_streams():
566 for f in s.all_files():
568 def manifest_text(self):
570 return self._manifest_text
572 class CollectionWriter:
573 KEEP_BLOCK_SIZE = 2**26
575 self._data_buffer = []
576 self._data_buffer_len = 0
577 self._current_stream_files = []
578 self._current_stream_length = 0
579 self._current_stream_locators = []
580 self._current_stream_name = '.'
581 self._current_file_name = None
582 self._current_file_pos = 0
583 self._finished_streams = []
588 def write(self, newdata):
589 self._data_buffer += [newdata]
590 self._data_buffer_len += len(newdata)
591 self._current_stream_length += len(newdata)
592 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
594 def flush_data(self):
595 data_buffer = ''.join(self._data_buffer)
596 if data_buffer != '':
597 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
598 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
599 def start_new_file(self, newfilename=None):
600 self.finish_current_file()
601 self.set_current_file_name(newfilename)
602 def set_current_file_name(self, newfilename):
603 if re.search(r'[ \t\n]', newfilename):
604 raise AssertionError("Manifest filenames cannot contain whitespace")
605 self._current_file_name = newfilename
606 def current_file_name(self):
607 return self._current_file_name
608 def finish_current_file(self):
609 if self._current_file_name == None:
610 if self._current_file_pos == self._current_stream_length:
612 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
613 self._current_stream_files += [[self._current_file_pos,
614 self._current_stream_length - self._current_file_pos,
615 self._current_file_name]]
616 self._current_file_pos = self._current_stream_length
617 def start_new_stream(self, newstreamname=None):
618 self.finish_current_stream()
619 self.set_current_stream_name(newstreamname)
620 def set_current_stream_name(self, newstreamname):
621 if re.search(r'[ \t\n]', newstreamname):
622 raise AssertionError("Manifest stream names cannot contain whitespace")
623 self._current_stream_name = newstreamname
624 def current_stream_name(self):
625 return self._current_stream_name
626 def finish_current_stream(self):
627 self.finish_current_file()
629 if len(self._current_stream_files) == 0:
631 elif self._current_stream_name == None:
632 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
634 self._finished_streams += [[self._current_stream_name,
635 self._current_stream_locators,
636 self._current_stream_files]]
637 self._current_stream_files = []
638 self._current_stream_length = 0
639 self._current_stream_locators = []
640 self._current_stream_name = None
641 self._current_file_pos = 0
642 self._current_file_name = None
644 return Keep.put(self.manifest_text())
645 def manifest_text(self):
646 self.finish_current_stream()
648 for stream in self._finished_streams:
649 manifest += stream[0]
650 if len(stream[1]) == 0:
651 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
653 for locator in stream[1]:
654 manifest += " %s" % locator
655 for sfile in stream[2]:
656 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
663 if 'KEEP_LOCAL_STORE' in os.environ:
664 return Keep.local_store_put(data)
665 p = subprocess.Popen(["whput", "-"],
666 stdout=subprocess.PIPE,
667 stdin=subprocess.PIPE,
668 stderr=subprocess.PIPE,
669 shell=False, close_fds=True)
670 stdoutdata, stderrdata = p.communicate(data)
671 if p.returncode != 0:
672 raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
673 return stdoutdata.rstrip()
676 if 'KEEP_LOCAL_STORE' in os.environ:
677 return Keep.local_store_get(locator)
678 p = subprocess.Popen(["whget", locator, "-"],
679 stdout=subprocess.PIPE,
681 stderr=subprocess.PIPE,
682 shell=False, close_fds=True)
683 stdoutdata, stderrdata = p.communicate(None)
684 if p.returncode != 0:
685 raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
686 m = hashlib.new('md5')
689 if locator.index(m.hexdigest()) == 0:
693 raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
695 def local_store_put(data):
696 m = hashlib.new('md5')
699 locator = '%s+%d' % (md5, len(data))
700 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
702 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
703 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
706 def local_store_get(locator):
707 r = re.search('^([0-9a-f]{32,})', locator)
709 raise Exception("Keep.get: invalid data locator '%s'" % locator)
710 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
712 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: