21 from apiclient import errors
22 from apiclient.discovery import build
24 if 'ARVADOS_DEBUG' in os.environ:
25 logging.basicConfig(level=logging.DEBUG)
27 class CredentialsFromEnv(object):
29 def http_request(self, uri, **kwargs):
30 from httplib import BadStatusLine
31 if 'headers' not in kwargs:
32 kwargs['headers'] = {}
33 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
35 return self.orig_http_request(uri, **kwargs)
37 # This is how httplib tells us that it tried to reuse an
38 # existing connection but it was already closed by the
39 # server. In that case, yes, we would like to retry.
40 # Unfortunately, we are not absolutely certain that the
41 # previous call did not succeed, so this is slightly
43 return self.orig_http_request(uri, **kwargs)
44 def authorize(self, http):
45 http.orig_http_request = http.request
46 http.request = types.MethodType(self.http_request, http)
49 url = ('https://%s/discovery/v1/apis/'
50 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
51 credentials = CredentialsFromEnv()
53 # Use system's CA certificates (if we find them) instead of httplib2's
54 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
55 if not os.path.exists(ca_certs):
56 ca_certs = None # use httplib2 default
58 http = httplib2.Http(ca_certs=ca_certs)
59 http = credentials.authorize(http)
60 if re.match(r'(?i)^(true|1|yes)$',
61 os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
62 http.disable_ssl_certificate_validation=True
63 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
65 def task_set_output(self,s):
66 service.job_tasks().update(uuid=self['uuid'],
78 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
79 t = UserDict.UserDict(t)
80 t.set_output = types.MethodType(task_set_output, t)
81 t.tmpdir = os.environ['TASK_WORK']
90 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
91 t = UserDict.UserDict(t)
92 t.tmpdir = os.environ['JOB_WORK']
96 def getjobparam(*args):
97 return current_job()['script_parameters'].get(*args)
102 class JobTask(object):
103 def __init__(self, parameters=dict(), runtime_constraints=dict()):
104 print "init jobtask %s %s" % (parameters, runtime_constraints)
108 def one_task_per_input_file(if_sequence=0, and_end_task=True):
109 if if_sequence != current_task()['sequence']:
111 job_input = current_job()['script_parameters']['input']
112 cr = CollectionReader(job_input)
113 for s in cr.all_streams():
114 for f in s.all_files():
115 task_input = f.as_manifest()
117 'job_uuid': current_job()['uuid'],
118 'created_by_job_task_uuid': current_task()['uuid'],
119 'sequence': if_sequence + 1,
124 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
126 service.job_tasks().update(uuid=current_task()['uuid'],
127 job_task=json.dumps({'success':True})
132 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
133 if if_sequence != current_task()['sequence']:
135 job_input = current_job()['script_parameters']['input']
136 cr = CollectionReader(job_input)
137 for s in cr.all_streams():
138 task_input = s.tokens()
140 'job_uuid': current_job()['uuid'],
141 'created_by_job_task_uuid': current_task()['uuid'],
142 'sequence': if_sequence + 1,
147 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
149 service.job_tasks().update(uuid=current_task()['uuid'],
150 job_task=json.dumps({'success':True})
156 def clear_tmpdir(path=None):
158 Ensure the given directory (or TASK_TMPDIR if none given)
162 path = current_task().tmpdir
163 if os.path.exists(path):
164 p = subprocess.Popen(['rm', '-rf', path])
165 stdout, stderr = p.communicate(None)
166 if p.returncode != 0:
167 raise Exception('rm -rf %s: %s' % (path, stderr))
171 def run_command(execargs, **kwargs):
172 kwargs.setdefault('stdin', subprocess.PIPE)
173 kwargs.setdefault('stdout', subprocess.PIPE)
174 kwargs.setdefault('stderr', sys.stderr)
175 kwargs.setdefault('close_fds', True)
176 kwargs.setdefault('shell', False)
177 p = subprocess.Popen(execargs, **kwargs)
178 stdoutdata, stderrdata = p.communicate(None)
179 if p.returncode != 0:
180 raise Exception("run_command %s exit %d:\n%s" %
181 (execargs, p.returncode, stderrdata))
182 return stdoutdata, stderrdata
185 def git_checkout(url, version, path):
186 if not re.search('^/', path):
187 path = os.path.join(current_job().tmpdir, path)
188 if not os.path.exists(path):
189 util.run_command(["git", "clone", url, path],
190 cwd=os.path.dirname(path))
191 util.run_command(["git", "checkout", version],
196 def tar_extractor(path, decompress_flag):
197 return subprocess.Popen(["tar",
199 ("-x%sf" % decompress_flag),
202 stdin=subprocess.PIPE, stderr=sys.stderr,
203 shell=False, close_fds=True)
206 def tarball_extract(tarball, path):
207 """Retrieve a tarball from Keep and extract it to a local
208 directory. Return the absolute path where the tarball was
209 extracted. If the top level of the tarball contained just one
210 file or directory, return the absolute path of that single
213 tarball -- collection locator
214 path -- where to extract the tarball: absolute, or relative to job tmp
216 if not re.search('^/', path):
217 path = os.path.join(current_job().tmpdir, path)
218 lockfile = open(path + '.lock', 'w')
219 fcntl.flock(lockfile, fcntl.LOCK_EX)
224 already_have_it = False
226 if os.readlink(os.path.join(path, '.locator')) == tarball:
227 already_have_it = True
230 if not already_have_it:
232 # emulate "rm -f" (i.e., if the file does not exist, we win)
234 os.unlink(os.path.join(path, '.locator'))
236 if os.path.exists(os.path.join(path, '.locator')):
237 os.unlink(os.path.join(path, '.locator'))
239 for f in CollectionReader(tarball).all_files():
240 if re.search('\.(tbz|tar.bz2)$', f.name()):
241 p = util.tar_extractor(path, 'j')
242 elif re.search('\.(tgz|tar.gz)$', f.name()):
243 p = util.tar_extractor(path, 'z')
244 elif re.search('\.tar$', f.name()):
245 p = util.tar_extractor(path, '')
247 raise Exception("tarball_extract cannot handle filename %s"
256 if p.returncode != 0:
258 raise Exception("tar exited %d" % p.returncode)
259 os.symlink(tarball, os.path.join(path, '.locator'))
260 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
262 if len(tld_extracts) == 1:
263 return os.path.join(path, tld_extracts[0])
267 def zipball_extract(zipball, path):
268 """Retrieve a zip archive from Keep and extract it to a local
269 directory. Return the absolute path where the archive was
270 extracted. If the top level of the archive contained just one
271 file or directory, return the absolute path of that single
274 zipball -- collection locator
275 path -- where to extract the archive: absolute, or relative to job tmp
277 if not re.search('^/', path):
278 path = os.path.join(current_job().tmpdir, path)
279 lockfile = open(path + '.lock', 'w')
280 fcntl.flock(lockfile, fcntl.LOCK_EX)
285 already_have_it = False
287 if os.readlink(os.path.join(path, '.locator')) == zipball:
288 already_have_it = True
291 if not already_have_it:
293 # emulate "rm -f" (i.e., if the file does not exist, we win)
295 os.unlink(os.path.join(path, '.locator'))
297 if os.path.exists(os.path.join(path, '.locator')):
298 os.unlink(os.path.join(path, '.locator'))
300 for f in CollectionReader(zipball).all_files():
301 if not re.search('\.zip$', f.name()):
302 raise Exception("zipball_extract cannot handle filename %s"
304 zip_filename = os.path.join(path, os.path.basename(f.name()))
305 zip_file = open(zip_filename, 'wb')
313 p = subprocess.Popen(["unzip",
318 stdin=None, stderr=sys.stderr,
319 shell=False, close_fds=True)
321 if p.returncode != 0:
323 raise Exception("unzip exited %d" % p.returncode)
324 os.unlink(zip_filename)
325 os.symlink(zipball, os.path.join(path, '.locator'))
326 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
328 if len(tld_extracts) == 1:
329 return os.path.join(path, tld_extracts[0])
333 def collection_extract(collection, path, files=[], decompress=True):
334 """Retrieve a collection from Keep and extract it to a local
335 directory. Return the absolute path where the collection was
338 collection -- collection locator
339 path -- where to extract: absolute, or relative to job tmp
341 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
343 collection_hash = matches.group(1)
345 collection_hash = hashlib.md5(collection).hexdigest()
346 if not re.search('^/', path):
347 path = os.path.join(current_job().tmpdir, path)
348 lockfile = open(path + '.lock', 'w')
349 fcntl.flock(lockfile, fcntl.LOCK_EX)
354 already_have_it = False
356 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
357 already_have_it = True
361 # emulate "rm -f" (i.e., if the file does not exist, we win)
363 os.unlink(os.path.join(path, '.locator'))
365 if os.path.exists(os.path.join(path, '.locator')):
366 os.unlink(os.path.join(path, '.locator'))
369 for s in CollectionReader(collection).all_streams():
370 stream_name = s.name()
371 for f in s.all_files():
373 ((f.name() not in files_got) and
374 (f.name() in files or
375 (decompress and f.decompressed_name() in files)))):
376 outname = f.decompressed_name() if decompress else f.name()
377 files_got += [outname]
378 if os.path.exists(os.path.join(path, stream_name, outname)):
380 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
381 outfile = open(os.path.join(path, stream_name, outname), 'wb')
382 for buf in (f.readall_decompressed() if decompress
386 if len(files_got) < len(files):
387 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
388 os.symlink(collection_hash, os.path.join(path, '.locator'))
394 def mkdir_dash_p(path):
395 if not os.path.exists(path):
396 util.mkdir_dash_p(os.path.dirname(path))
400 if not os.path.exists(path):
404 def stream_extract(stream, path, files=[], decompress=True):
405 """Retrieve a stream from Keep and extract it to a local
406 directory. Return the absolute path where the stream was
409 stream -- StreamReader object
410 path -- where to extract: absolute, or relative to job tmp
412 if not re.search('^/', path):
413 path = os.path.join(current_job().tmpdir, path)
414 lockfile = open(path + '.lock', 'w')
415 fcntl.flock(lockfile, fcntl.LOCK_EX)
422 for f in stream.all_files():
424 ((f.name() not in files_got) and
425 (f.name() in files or
426 (decompress and f.decompressed_name() in files)))):
427 outname = f.decompressed_name() if decompress else f.name()
428 files_got += [outname]
429 if os.path.exists(os.path.join(path, outname)):
430 os.unlink(os.path.join(path, outname))
431 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
432 outfile = open(os.path.join(path, outname), 'wb')
433 for buf in (f.readall_decompressed() if decompress
437 if len(files_got) < len(files):
438 raise Exception("Wanted files %s but only got %s from %s" %
439 (files, files_got, map(lambda z: z.name(),
440 list(stream.all_files()))))
445 def listdir_recursive(dirname, base=None):
447 for ent in sorted(os.listdir(dirname)):
448 ent_path = os.path.join(dirname, ent)
449 ent_base = os.path.join(base, ent) if base else ent
450 if os.path.isdir(ent_path):
451 allfiles += util.listdir_recursive(ent_path, ent_base)
453 allfiles += [ent_base]
456 class StreamFileReader(object):
457 def __init__(self, stream, pos, size, name):
458 self._stream = stream
465 def decompressed_name(self):
466 return re.sub('\.(bz2|gz)$', '', self._name)
469 def stream_name(self):
470 return self._stream.name()
471 def read(self, size, **kwargs):
472 self._stream.seek(self._pos + self._filepos)
473 data = self._stream.read(min(size, self._size - self._filepos))
474 self._filepos += len(data)
476 def readall(self, size=2**20, **kwargs):
478 data = self.read(size, **kwargs)
482 def bunzip2(self, size):
483 decompressor = bz2.BZ2Decompressor()
484 for chunk in self.readall(size):
485 data = decompressor.decompress(chunk)
486 if data and data != '':
488 def gunzip(self, size):
489 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
490 for chunk in self.readall(size):
491 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
492 if data and data != '':
494 def readall_decompressed(self, size=2**20):
495 self._stream.seek(self._pos + self._filepos)
496 if re.search('\.bz2$', self._name):
497 return self.bunzip2(size)
498 elif re.search('\.gz$', self._name):
499 return self.gunzip(size)
501 return self.readall(size)
502 def readlines(self, decompress=True):
504 datasource = self.readall_decompressed()
506 self._stream.seek(self._pos + self._filepos)
507 datasource = self.readall()
509 for newdata in datasource:
513 eol = string.find(data, "\n", sol)
516 yield data[sol:eol+1]
521 def as_manifest(self):
523 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
524 % (self._stream.name(), self.name()))
525 return string.join(self._stream.tokens_for_range(self._pos, self._size),
528 class StreamReader(object):
529 def __init__(self, tokens):
530 self._tokens = tokens
531 self._current_datablock_data = None
532 self._current_datablock_pos = 0
533 self._current_datablock_index = -1
536 self._stream_name = None
537 self.data_locators = []
540 for tok in self._tokens:
541 if self._stream_name == None:
542 self._stream_name = tok
543 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
544 self.data_locators += [tok]
545 elif re.search(r'^\d+:\d+:\S+', tok):
546 pos, size, name = tok.split(':',2)
547 self.files += [[int(pos), int(size), name]]
549 raise Exception("Invalid manifest format")
553 def tokens_for_range(self, range_start, range_size):
554 resp = [self._stream_name]
555 return_all_tokens = False
557 token_bytes_skipped = 0
558 for locator in self.data_locators:
559 sizehint = re.search(r'\+(\d+)', locator)
561 return_all_tokens = True
562 if return_all_tokens:
565 blocksize = int(sizehint.group(0))
566 if range_start + range_size <= block_start:
568 if range_start < block_start + blocksize:
571 token_bytes_skipped += blocksize
572 block_start += blocksize
574 if ((f[0] < range_start + range_size)
576 (f[0] + f[1] > range_start)
579 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
582 return self._stream_name
586 yield StreamFileReader(self, pos, size, name)
587 def nextdatablock(self):
588 if self._current_datablock_index < 0:
589 self._current_datablock_pos = 0
590 self._current_datablock_index = 0
592 self._current_datablock_pos += self.current_datablock_size()
593 self._current_datablock_index += 1
594 self._current_datablock_data = None
595 def current_datablock_data(self):
596 if self._current_datablock_data == None:
597 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
598 return self._current_datablock_data
599 def current_datablock_size(self):
600 if self._current_datablock_index < 0:
602 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
604 return int(sizehint.group(0))
605 return len(self.current_datablock_data())
607 """Set the position of the next read operation."""
609 def really_seek(self):
610 """Find and load the appropriate data block, so the byte at
613 if self._pos == self._current_datablock_pos:
615 if (self._current_datablock_pos != None and
616 self._pos >= self._current_datablock_pos and
617 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
619 if self._pos < self._current_datablock_pos:
620 self._current_datablock_index = -1
622 while (self._pos > self._current_datablock_pos and
623 self._pos > self._current_datablock_pos + self.current_datablock_size()):
625 def read(self, size):
626 """Read no more than size bytes -- but at least one byte,
627 unless _pos is already at the end of the stream.
632 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
634 if self._current_datablock_index >= len(self.data_locators):
636 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
637 self._pos += len(data)
640 class CollectionReader(object):
641 def __init__(self, manifest_locator_or_text):
642 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
643 self._manifest_text = manifest_locator_or_text
644 self._manifest_locator = None
646 self._manifest_locator = manifest_locator_or_text
647 self._manifest_text = None
654 if self._streams != None:
656 if not self._manifest_text:
657 self._manifest_text = Keep.get(self._manifest_locator)
659 for stream_line in self._manifest_text.split("\n"):
660 if stream_line != '':
661 stream_tokens = stream_line.split()
662 self._streams += [stream_tokens]
663 def all_streams(self):
666 for s in self._streams:
667 resp += [StreamReader(s)]
670 for s in self.all_streams():
671 for f in s.all_files():
673 def manifest_text(self):
675 return self._manifest_text
677 class CollectionWriter(object):
678 KEEP_BLOCK_SIZE = 2**26
680 self._data_buffer = []
681 self._data_buffer_len = 0
682 self._current_stream_files = []
683 self._current_stream_length = 0
684 self._current_stream_locators = []
685 self._current_stream_name = '.'
686 self._current_file_name = None
687 self._current_file_pos = 0
688 self._finished_streams = []
693 def write_directory_tree(self,
694 path, stream_name='.', max_manifest_depth=-1):
695 self.start_new_stream(stream_name)
697 if max_manifest_depth == 0:
698 dirents = sorted(util.listdir_recursive(path))
700 dirents = sorted(os.listdir(path))
701 for dirent in dirents:
702 target = os.path.join(path, dirent)
703 if os.path.isdir(target):
705 os.path.join(stream_name, dirent),
706 max_manifest_depth-1]]
708 self.start_new_file(dirent)
709 with open(target, 'rb') as f:
715 self.finish_current_stream()
716 map(lambda x: self.write_directory_tree(*x), todo)
718 def write(self, newdata):
719 if hasattr(newdata, '__iter__'):
723 self._data_buffer += [newdata]
724 self._data_buffer_len += len(newdata)
725 self._current_stream_length += len(newdata)
726 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
728 def flush_data(self):
729 data_buffer = ''.join(self._data_buffer)
730 if data_buffer != '':
731 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
732 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
733 self._data_buffer_len = len(self._data_buffer[0])
734 def start_new_file(self, newfilename=None):
735 self.finish_current_file()
736 self.set_current_file_name(newfilename)
737 def set_current_file_name(self, newfilename):
738 newfilename = re.sub(r' ', '\\\\040', newfilename)
739 if re.search(r'[ \t\n]', newfilename):
740 raise AssertionError("Manifest filenames cannot contain whitespace")
741 self._current_file_name = newfilename
742 def current_file_name(self):
743 return self._current_file_name
744 def finish_current_file(self):
745 if self._current_file_name == None:
746 if self._current_file_pos == self._current_stream_length:
748 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
749 self._current_stream_files += [[self._current_file_pos,
750 self._current_stream_length - self._current_file_pos,
751 self._current_file_name]]
752 self._current_file_pos = self._current_stream_length
753 def start_new_stream(self, newstreamname='.'):
754 self.finish_current_stream()
755 self.set_current_stream_name(newstreamname)
756 def set_current_stream_name(self, newstreamname):
757 if re.search(r'[ \t\n]', newstreamname):
758 raise AssertionError("Manifest stream names cannot contain whitespace")
759 self._current_stream_name = '.' if newstreamname=='' else newstreamname
760 def current_stream_name(self):
761 return self._current_stream_name
762 def finish_current_stream(self):
763 self.finish_current_file()
765 if len(self._current_stream_files) == 0:
767 elif self._current_stream_name == None:
768 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
770 self._finished_streams += [[self._current_stream_name,
771 self._current_stream_locators,
772 self._current_stream_files]]
773 self._current_stream_files = []
774 self._current_stream_length = 0
775 self._current_stream_locators = []
776 self._current_stream_name = None
777 self._current_file_pos = 0
778 self._current_file_name = None
780 return Keep.put(self.manifest_text())
781 def manifest_text(self):
782 self.finish_current_stream()
784 for stream in self._finished_streams:
785 if not re.search(r'^\.(/.*)?$', stream[0]):
787 manifest += stream[0]
788 if len(stream[1]) == 0:
789 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
791 for locator in stream[1]:
792 manifest += " %s" % locator
793 for sfile in stream[2]:
794 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
798 global_client_object = None
802 def global_client_object():
803 global global_client_object
804 if global_client_object == None:
805 global_client_object = KeepClient()
806 return global_client_object
809 def get(locator, **kwargs):
810 return Keep.global_client_object().get(locator, **kwargs)
813 def put(data, **kwargs):
814 return Keep.global_client_object().put(data, **kwargs)
816 class KeepClient(object):
818 class ThreadLimiter(object):
820 Limit the number of threads running at a given time to
821 {desired successes} minus {successes reported}. When successes
822 reported == desired, wake up the remaining threads and tell
825 Should be used in a "with" block.
827 def __init__(self, todo):
830 self._todo_lock = threading.Semaphore(todo)
831 self._done_lock = threading.Lock()
833 self._todo_lock.acquire()
835 def __exit__(self, type, value, traceback):
836 self._todo_lock.release()
837 def shall_i_proceed(self):
839 Return true if the current thread should do stuff. Return
840 false if the current thread should just stop.
842 with self._done_lock:
843 return (self._done < self._todo)
844 def increment_done(self):
846 Report that the current thread was successful.
848 with self._done_lock:
852 Return how many successes were reported.
854 with self._done_lock:
857 class KeepWriterThread(threading.Thread):
859 Write a blob of data to the given Keep server. Call
860 increment_done() of the given ThreadLimiter if the write
863 def __init__(self, **kwargs):
864 super(KeepClient.KeepWriterThread, self).__init__()
867 with self.args['thread_limiter'] as limiter:
868 if not limiter.shall_i_proceed():
869 # My turn arrived, but the job has been done without
872 logging.debug("KeepWriterThread %s proceeding %s %s" %
873 (str(threading.current_thread()),
874 self.args['data_hash'],
875 self.args['service_root']))
877 url = self.args['service_root'] + self.args['data_hash']
878 api_token = os.environ['ARVADOS_API_TOKEN']
879 headers = {'Authorization': "OAuth2 %s" % api_token}
881 resp, content = h.request(url.encode('utf-8'), 'PUT',
883 body=self.args['data'])
884 if (resp['status'] == '401' and
885 re.match(r'Timestamp verification failed', content)):
886 body = KeepClient.sign_for_old_server(
887 self.args['data_hash'],
890 resp, content = h.request(url.encode('utf-8'), 'PUT',
893 if re.match(r'^2\d\d$', resp['status']):
894 logging.debug("KeepWriterThread %s succeeded %s %s" %
895 (str(threading.current_thread()),
896 self.args['data_hash'],
897 self.args['service_root']))
898 return limiter.increment_done()
899 logging.warning("Request fail: PUT %s => %s %s" %
900 (url, resp['status'], content))
901 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
902 logging.warning("Request fail: PUT %s => %s: %s" %
903 (url, type(e), str(e)))
906 self.lock = threading.Lock()
907 self.service_roots = None
909 def shuffled_service_roots(self, hash):
910 if self.service_roots == None:
912 keep_disks = api().keep_disks().list().execute()['items']
913 roots = (("http%s://%s:%d/" %
914 ('s' if f['service_ssl_flag'] else '',
918 self.service_roots = sorted(set(roots))
919 logging.debug(str(self.service_roots))
922 pool = self.service_roots[:]
926 if len(pseq) < len(hash) / 4: # first time around
927 seed = hash[-4:] + hash
930 probe = int(seed[0:8], 16) % len(pool)
931 pseq += [pool[probe]]
932 pool = pool[:probe] + pool[probe+1:]
934 logging.debug(str(pseq))
937 def get(self, locator):
938 if 'KEEP_LOCAL_STORE' in os.environ:
939 return KeepClient.local_store_get(locator)
940 expect_hash = re.sub(r'\+.*', '', locator)
941 for service_root in self.shuffled_service_roots(expect_hash):
943 url = service_root + expect_hash
944 api_token = os.environ['ARVADOS_API_TOKEN']
945 headers = {'Authorization': "OAuth2 %s" % api_token,
946 'Accept': 'application/octet-stream'}
948 resp, content = h.request(url.encode('utf-8'), 'GET',
950 if re.match(r'^2\d\d$', resp['status']):
951 m = hashlib.new('md5')
954 if md5 == expect_hash:
956 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
957 except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
958 logging.info("Request fail: GET %s => %s: %s" %
959 (url, type(e), str(e)))
960 raise Exception("Not found: %s" % expect_hash)
962 def put(self, data, **kwargs):
963 if 'KEEP_LOCAL_STORE' in os.environ:
964 return KeepClient.local_store_put(data)
965 m = hashlib.new('md5')
967 data_hash = m.hexdigest()
969 want_copies = kwargs.get('copies', 2)
970 if not (want_copies > 0):
973 thread_limiter = KeepClient.ThreadLimiter(want_copies)
974 for service_root in self.shuffled_service_roots(data_hash):
975 t = KeepClient.KeepWriterThread(data=data,
977 service_root=service_root,
978 thread_limiter=thread_limiter)
983 have_copies = thread_limiter.done()
984 if have_copies == want_copies:
985 return (data_hash + '+' + str(len(data)))
986 raise Exception("Write fail for %s: wanted %d but wrote %d" %
987 (data_hash, want_copies, have_copies))
990 def sign_for_old_server(data_hash, data):
991 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
995 def local_store_put(data):
996 m = hashlib.new('md5')
999 locator = '%s+%d' % (md5, len(data))
1000 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
1002 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
1003 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
1006 def local_store_get(locator):
1007 r = re.search('^([0-9a-f]{32,})', locator)
1009 raise Exception("Keep.get: invalid data locator '%s'" % locator)
1010 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
1012 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: