21 from apiclient import errors
22 from apiclient.discovery import build
24 if 'ARVADOS_DEBUG' in os.environ:
25 logging.basicConfig(level=logging.DEBUG)
27 class CredentialsFromEnv(object):
29 def http_request(self, uri, **kwargs):
30 from httplib import BadStatusLine
31 if 'headers' not in kwargs:
32 kwargs['headers'] = {}
33 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
35 return self.orig_http_request(uri, **kwargs)
37 # This is how httplib tells us that it tried to reuse an
38 # existing connection but it was already closed by the
39 # server. In that case, yes, we would like to retry.
40 # Unfortunately, we are not absolutely certain that the
41 # previous call did not succeed, so this is slightly
43 return self.orig_http_request(uri, **kwargs)
44 def authorize(self, http):
45 http.orig_http_request = http.request
46 http.request = types.MethodType(self.http_request, http)
49 url = ('https://%s/discovery/v1/apis/'
50 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
51 credentials = CredentialsFromEnv()
53 # Use system's CA certificates (if we find them) instead of httplib2's
54 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
55 if not os.path.exists(ca_certs):
56 ca_certs = None # use httplib2 default
58 http = httplib2.Http(ca_certs=ca_certs)
59 http = credentials.authorize(http)
60 if re.match(r'(?i)^(true|1|yes)$',
61 os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
62 http.disable_ssl_certificate_validation=True
63 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
65 def task_set_output(self,s):
66 service.job_tasks().update(uuid=self['uuid'],
78 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
79 t = UserDict.UserDict(t)
80 t.set_output = types.MethodType(task_set_output, t)
81 t.tmpdir = os.environ['TASK_WORK']
90 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
91 t = UserDict.UserDict(t)
92 t.tmpdir = os.environ['JOB_WORK']
99 class JobTask(object):
100 def __init__(self, parameters=dict(), runtime_constraints=dict()):
101 print "init jobtask %s %s" % (parameters, runtime_constraints)
105 def one_task_per_input_file(if_sequence=0, and_end_task=True):
106 if if_sequence != current_task()['sequence']:
108 job_input = current_job()['script_parameters']['input']
109 cr = CollectionReader(job_input)
110 for s in cr.all_streams():
111 for f in s.all_files():
112 task_input = f.as_manifest()
114 'job_uuid': current_job()['uuid'],
115 'created_by_job_task_uuid': current_task()['uuid'],
116 'sequence': if_sequence + 1,
121 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
123 service.job_tasks().update(uuid=current_task()['uuid'],
124 job_task=json.dumps({'success':True})
129 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
130 if if_sequence != current_task()['sequence']:
132 job_input = current_job()['script_parameters']['input']
133 cr = CollectionReader(job_input)
134 for s in cr.all_streams():
135 task_input = s.tokens()
137 'job_uuid': current_job()['uuid'],
138 'created_by_job_task_uuid': current_task()['uuid'],
139 'sequence': if_sequence + 1,
144 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
146 service.job_tasks().update(uuid=current_task()['uuid'],
147 job_task=json.dumps({'success':True})
153 def run_command(execargs, **kwargs):
154 if 'stdin' not in kwargs:
155 kwargs['stdin'] = subprocess.PIPE
156 if 'stdout' not in kwargs:
157 kwargs['stdout'] = subprocess.PIPE
158 if 'stderr' not in kwargs:
159 kwargs['stderr'] = subprocess.PIPE
160 p = subprocess.Popen(execargs, close_fds=True, shell=False,
162 stdoutdata, stderrdata = p.communicate(None)
163 if p.returncode != 0:
164 raise Exception("run_command %s exit %d:\n%s" %
165 (execargs, p.returncode, stderrdata))
166 return stdoutdata, stderrdata
169 def git_checkout(url, version, path):
170 if not re.search('^/', path):
171 path = os.path.join(current_job().tmpdir, path)
172 if not os.path.exists(path):
173 util.run_command(["git", "clone", url, path],
174 cwd=os.path.dirname(path))
175 util.run_command(["git", "checkout", version],
180 def tar_extractor(path, decompress_flag):
181 return subprocess.Popen(["tar",
183 ("-x%sf" % decompress_flag),
186 stdin=subprocess.PIPE, stderr=sys.stderr,
187 shell=False, close_fds=True)
190 def tarball_extract(tarball, path):
191 """Retrieve a tarball from Keep and extract it to a local
192 directory. Return the absolute path where the tarball was
193 extracted. If the top level of the tarball contained just one
194 file or directory, return the absolute path of that single
197 tarball -- collection locator
198 path -- where to extract the tarball: absolute, or relative to job tmp
200 if not re.search('^/', path):
201 path = os.path.join(current_job().tmpdir, path)
202 lockfile = open(path + '.lock', 'w')
203 fcntl.flock(lockfile, fcntl.LOCK_EX)
208 already_have_it = False
210 if os.readlink(os.path.join(path, '.locator')) == tarball:
211 already_have_it = True
214 if not already_have_it:
216 # emulate "rm -f" (i.e., if the file does not exist, we win)
218 os.unlink(os.path.join(path, '.locator'))
220 if os.path.exists(os.path.join(path, '.locator')):
221 os.unlink(os.path.join(path, '.locator'))
223 for f in CollectionReader(tarball).all_files():
224 if re.search('\.(tbz|tar.bz2)$', f.name()):
225 p = util.tar_extractor(path, 'j')
226 elif re.search('\.(tgz|tar.gz)$', f.name()):
227 p = util.tar_extractor(path, 'z')
228 elif re.search('\.tar$', f.name()):
229 p = util.tar_extractor(path, '')
231 raise Exception("tarball_extract cannot handle filename %s"
240 if p.returncode != 0:
242 raise Exception("tar exited %d" % p.returncode)
243 os.symlink(tarball, os.path.join(path, '.locator'))
244 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
246 if len(tld_extracts) == 1:
247 return os.path.join(path, tld_extracts[0])
251 def zipball_extract(zipball, path):
252 """Retrieve a zip archive from Keep and extract it to a local
253 directory. Return the absolute path where the archive was
254 extracted. If the top level of the archive contained just one
255 file or directory, return the absolute path of that single
258 zipball -- collection locator
259 path -- where to extract the archive: absolute, or relative to job tmp
261 if not re.search('^/', path):
262 path = os.path.join(current_job().tmpdir, path)
263 lockfile = open(path + '.lock', 'w')
264 fcntl.flock(lockfile, fcntl.LOCK_EX)
269 already_have_it = False
271 if os.readlink(os.path.join(path, '.locator')) == zipball:
272 already_have_it = True
275 if not already_have_it:
277 # emulate "rm -f" (i.e., if the file does not exist, we win)
279 os.unlink(os.path.join(path, '.locator'))
281 if os.path.exists(os.path.join(path, '.locator')):
282 os.unlink(os.path.join(path, '.locator'))
284 for f in CollectionReader(zipball).all_files():
285 if not re.search('\.zip$', f.name()):
286 raise Exception("zipball_extract cannot handle filename %s"
288 zip_filename = os.path.join(path, os.path.basename(f.name()))
289 zip_file = open(zip_filename, 'wb')
297 p = subprocess.Popen(["unzip",
302 stdin=None, stderr=sys.stderr,
303 shell=False, close_fds=True)
305 if p.returncode != 0:
307 raise Exception("unzip exited %d" % p.returncode)
308 os.unlink(zip_filename)
309 os.symlink(zipball, os.path.join(path, '.locator'))
310 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
312 if len(tld_extracts) == 1:
313 return os.path.join(path, tld_extracts[0])
317 def collection_extract(collection, path, files=[], decompress=True):
318 """Retrieve a collection from Keep and extract it to a local
319 directory. Return the absolute path where the collection was
322 collection -- collection locator
323 path -- where to extract: absolute, or relative to job tmp
325 if not re.search('^/', path):
326 path = os.path.join(current_job().tmpdir, path)
327 lockfile = open(path + '.lock', 'w')
328 fcntl.flock(lockfile, fcntl.LOCK_EX)
333 already_have_it = False
335 if os.readlink(os.path.join(path, '.locator')) == collection:
336 already_have_it = True
340 # emulate "rm -f" (i.e., if the file does not exist, we win)
342 os.unlink(os.path.join(path, '.locator'))
344 if os.path.exists(os.path.join(path, '.locator')):
345 os.unlink(os.path.join(path, '.locator'))
348 for s in CollectionReader(collection).all_streams():
349 stream_name = s.name()
350 for f in s.all_files():
352 ((f.name() not in files_got) and
353 (f.name() in files or
354 (decompress and f.decompressed_name() in files)))):
355 outname = f.decompressed_name() if decompress else f.name()
356 files_got += [outname]
357 if os.path.exists(os.path.join(path, stream_name, outname)):
359 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
360 outfile = open(os.path.join(path, stream_name, outname), 'wb')
361 for buf in (f.readall_decompressed() if decompress
365 if len(files_got) < len(files):
366 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
367 os.symlink(collection, os.path.join(path, '.locator'))
373 def mkdir_dash_p(path):
374 if not os.path.exists(path):
375 util.mkdir_dash_p(os.path.dirname(path))
379 if not os.path.exists(path):
383 def stream_extract(stream, path, files=[], decompress=True):
384 """Retrieve a stream from Keep and extract it to a local
385 directory. Return the absolute path where the stream was
388 stream -- StreamReader object
389 path -- where to extract: absolute, or relative to job tmp
391 if not re.search('^/', path):
392 path = os.path.join(current_job().tmpdir, path)
393 lockfile = open(path + '.lock', 'w')
394 fcntl.flock(lockfile, fcntl.LOCK_EX)
401 for f in stream.all_files():
403 ((f.name() not in files_got) and
404 (f.name() in files or
405 (decompress and f.decompressed_name() in files)))):
406 outname = f.decompressed_name() if decompress else f.name()
407 files_got += [outname]
408 if os.path.exists(os.path.join(path, outname)):
409 os.unlink(os.path.join(path, outname))
410 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
411 outfile = open(os.path.join(path, outname), 'wb')
412 for buf in (f.readall_decompressed() if decompress
416 if len(files_got) < len(files):
417 raise Exception("Wanted files %s but only got %s from %s" %
418 (files, files_got, map(lambda z: z.name(),
419 list(stream.all_files()))))
424 def listdir_recursive(dirname, base=None):
426 for ent in sorted(os.listdir(dirname)):
427 ent_path = os.path.join(dirname, ent)
428 ent_base = os.path.join(base, ent) if base else ent
429 if os.path.isdir(ent_path):
430 allfiles += util.listdir_recursive(ent_path, ent_base)
432 allfiles += [ent_base]
435 class StreamFileReader(object):
436 def __init__(self, stream, pos, size, name):
437 self._stream = stream
444 def decompressed_name(self):
445 return re.sub('\.(bz2|gz)$', '', self._name)
448 def stream_name(self):
449 return self._stream.name()
450 def read(self, size, **kwargs):
451 self._stream.seek(self._pos + self._filepos)
452 data = self._stream.read(min(size, self._size - self._filepos))
453 self._filepos += len(data)
455 def readall(self, size=2**20, **kwargs):
457 data = self.read(size, **kwargs)
461 def bunzip2(self, size):
462 decompressor = bz2.BZ2Decompressor()
463 for chunk in self.readall(size):
464 data = decompressor.decompress(chunk)
465 if data and data != '':
467 def gunzip(self, size):
468 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
469 for chunk in self.readall(size):
470 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
471 if data and data != '':
473 def readall_decompressed(self, size=2**20):
474 self._stream.seek(self._pos + self._filepos)
475 if re.search('\.bz2$', self._name):
476 return self.bunzip2(size)
477 elif re.search('\.gz$', self._name):
478 return self.gunzip(size)
480 return self.readall(size)
481 def readlines(self, decompress=True):
483 datasource = self.readall_decompressed()
485 self._stream.seek(self._pos + self._filepos)
486 datasource = self.readall()
488 for newdata in datasource:
492 eol = string.find(data, "\n", sol)
495 yield data[sol:eol+1]
500 def as_manifest(self):
502 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
503 % (self._stream.name(), self.name()))
504 return string.join(self._stream.tokens_for_range(self._pos, self._size),
507 class StreamReader(object):
508 def __init__(self, tokens):
509 self._tokens = tokens
510 self._current_datablock_data = None
511 self._current_datablock_pos = 0
512 self._current_datablock_index = -1
515 self._stream_name = None
516 self.data_locators = []
519 for tok in self._tokens:
520 if self._stream_name == None:
521 self._stream_name = tok
522 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
523 self.data_locators += [tok]
524 elif re.search(r'^\d+:\d+:\S+', tok):
525 pos, size, name = tok.split(':',2)
526 self.files += [[int(pos), int(size), name]]
528 raise Exception("Invalid manifest format")
532 def tokens_for_range(self, range_start, range_size):
533 resp = [self._stream_name]
534 return_all_tokens = False
536 token_bytes_skipped = 0
537 for locator in self.data_locators:
538 sizehint = re.search(r'\+(\d+)', locator)
540 return_all_tokens = True
541 if return_all_tokens:
544 blocksize = int(sizehint.group(0))
545 if range_start + range_size <= block_start:
547 if range_start < block_start + blocksize:
550 token_bytes_skipped += blocksize
551 block_start += blocksize
553 if ((f[0] < range_start + range_size)
555 (f[0] + f[1] > range_start)
558 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
561 return self._stream_name
565 yield StreamFileReader(self, pos, size, name)
566 def nextdatablock(self):
567 if self._current_datablock_index < 0:
568 self._current_datablock_pos = 0
569 self._current_datablock_index = 0
571 self._current_datablock_pos += self.current_datablock_size()
572 self._current_datablock_index += 1
573 self._current_datablock_data = None
574 def current_datablock_data(self):
575 if self._current_datablock_data == None:
576 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
577 return self._current_datablock_data
578 def current_datablock_size(self):
579 if self._current_datablock_index < 0:
581 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
583 return int(sizehint.group(0))
584 return len(self.current_datablock_data())
586 """Set the position of the next read operation."""
588 def really_seek(self):
589 """Find and load the appropriate data block, so the byte at
592 if self._pos == self._current_datablock_pos:
594 if (self._current_datablock_pos != None and
595 self._pos >= self._current_datablock_pos and
596 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
598 if self._pos < self._current_datablock_pos:
599 self._current_datablock_index = -1
601 while (self._pos > self._current_datablock_pos and
602 self._pos > self._current_datablock_pos + self.current_datablock_size()):
604 def read(self, size):
605 """Read no more than size bytes -- but at least one byte,
606 unless _pos is already at the end of the stream.
611 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
613 if self._current_datablock_index >= len(self.data_locators):
615 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
616 self._pos += len(data)
619 class CollectionReader(object):
620 def __init__(self, manifest_locator_or_text):
621 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
622 self._manifest_text = manifest_locator_or_text
623 self._manifest_locator = None
625 self._manifest_locator = manifest_locator_or_text
626 self._manifest_text = None
633 if self._streams != None:
635 if not self._manifest_text:
636 self._manifest_text = Keep.get(self._manifest_locator)
638 for stream_line in self._manifest_text.split("\n"):
639 if stream_line != '':
640 stream_tokens = stream_line.split()
641 self._streams += [stream_tokens]
642 def all_streams(self):
645 for s in self._streams:
646 resp += [StreamReader(s)]
649 for s in self.all_streams():
650 for f in s.all_files():
652 def manifest_text(self):
654 return self._manifest_text
656 class CollectionWriter(object):
657 KEEP_BLOCK_SIZE = 2**26
659 self._data_buffer = []
660 self._data_buffer_len = 0
661 self._current_stream_files = []
662 self._current_stream_length = 0
663 self._current_stream_locators = []
664 self._current_stream_name = '.'
665 self._current_file_name = None
666 self._current_file_pos = 0
667 self._finished_streams = []
672 def write_directory_tree(self,
673 path, stream_name='.', max_manifest_depth=-1):
674 self.start_new_stream(stream_name)
676 if max_manifest_depth == 0:
677 dirents = util.listdir_recursive(path)
679 dirents = sorted(os.listdir(path))
680 for dirent in dirents:
681 target = os.path.join(path, dirent)
682 if os.path.isdir(target):
684 os.path.join(stream_name, dirent),
685 max_manifest_depth-1]]
687 self.start_new_file(dirent)
688 with open(target, 'rb') as f:
694 self.finish_current_stream()
695 map(lambda x: self.write_directory_tree(*x), todo)
697 def write(self, newdata):
698 self._data_buffer += [newdata]
699 self._data_buffer_len += len(newdata)
700 self._current_stream_length += len(newdata)
701 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
703 def flush_data(self):
704 data_buffer = ''.join(self._data_buffer)
705 if data_buffer != '':
706 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
707 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
708 self._data_buffer_len = len(self._data_buffer[0])
709 def start_new_file(self, newfilename=None):
710 self.finish_current_file()
711 self.set_current_file_name(newfilename)
712 def set_current_file_name(self, newfilename):
713 newfilename = re.sub(r' ', '\\\\040', newfilename)
714 if re.search(r'[ \t\n]', newfilename):
715 raise AssertionError("Manifest filenames cannot contain whitespace")
716 self._current_file_name = newfilename
717 def current_file_name(self):
718 return self._current_file_name
719 def finish_current_file(self):
720 if self._current_file_name == None:
721 if self._current_file_pos == self._current_stream_length:
723 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
724 self._current_stream_files += [[self._current_file_pos,
725 self._current_stream_length - self._current_file_pos,
726 self._current_file_name]]
727 self._current_file_pos = self._current_stream_length
728 def start_new_stream(self, newstreamname='.'):
729 self.finish_current_stream()
730 self.set_current_stream_name(newstreamname)
731 def set_current_stream_name(self, newstreamname):
732 if re.search(r'[ \t\n]', newstreamname):
733 raise AssertionError("Manifest stream names cannot contain whitespace")
734 self._current_stream_name = newstreamname
735 def current_stream_name(self):
736 return self._current_stream_name
737 def finish_current_stream(self):
738 self.finish_current_file()
740 if len(self._current_stream_files) == 0:
742 elif self._current_stream_name == None:
743 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
745 self._finished_streams += [[self._current_stream_name,
746 self._current_stream_locators,
747 self._current_stream_files]]
748 self._current_stream_files = []
749 self._current_stream_length = 0
750 self._current_stream_locators = []
751 self._current_stream_name = None
752 self._current_file_pos = 0
753 self._current_file_name = None
755 return Keep.put(self.manifest_text())
756 def manifest_text(self):
757 self.finish_current_stream()
759 for stream in self._finished_streams:
760 if not re.search(r'^\.(/.*)?$', stream[0]):
762 manifest += stream[0]
763 if len(stream[1]) == 0:
764 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
766 for locator in stream[1]:
767 manifest += " %s" % locator
768 for sfile in stream[2]:
769 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
772 def data_locators(self):
774 for name, locators, files in self._finished_streams:
778 global_client_object = None
782 def global_client_object():
783 global global_client_object
784 if global_client_object == None:
785 global_client_object = KeepClient()
786 return global_client_object
789 def get(locator, **kwargs):
790 return Keep.global_client_object().get(locator, **kwargs)
793 def put(data, **kwargs):
794 return Keep.global_client_object().put(data, **kwargs)
796 class KeepClient(object):
798 class ThreadLimiter(object):
800 Limit the number of threads running at a given time to
801 {desired successes} minus {successes reported}. When successes
802 reported == desired, wake up the remaining threads and tell
805 Should be used in a "with" block.
807 def __init__(self, todo):
810 self._todo_lock = threading.Semaphore(todo)
811 self._done_lock = threading.Lock()
813 self._todo_lock.acquire()
815 def __exit__(self, type, value, traceback):
816 self._todo_lock.release()
817 def shall_i_proceed(self):
819 Return true if the current thread should do stuff. Return
820 false if the current thread should just stop.
822 with self._done_lock:
823 return (self._done < self._todo)
824 def increment_done(self):
826 Report that the current thread was successful.
828 with self._done_lock:
832 Return how many successes were reported.
834 with self._done_lock:
837 class KeepWriterThread(threading.Thread):
839 Write a blob of data to the given Keep server. Call
840 increment_done() of the given ThreadLimiter if the write
843 def __init__(self, **kwargs):
844 super(KeepClient.KeepWriterThread, self).__init__()
847 with self.args['thread_limiter'] as limiter:
848 if not limiter.shall_i_proceed():
849 # My turn arrived, but the job has been done without
852 logging.debug("KeepWriterThread %s proceeding %s %s" %
853 (str(threading.current_thread()),
854 self.args['data_hash'],
855 self.args['service_root']))
857 url = self.args['service_root'] + self.args['data_hash']
858 api_token = os.environ['ARVADOS_API_TOKEN']
859 headers = {'Authorization': "OAuth2 %s" % api_token}
861 resp, content = h.request(url.encode('utf-8'), 'PUT',
863 body=self.args['data'])
864 if (resp['status'] == '401' and
865 re.match(r'Timestamp verification failed', content)):
866 body = KeepClient.sign_for_old_server(
867 self.args['data_hash'],
870 resp, content = h.request(url.encode('utf-8'), 'PUT',
873 if re.match(r'^2\d\d$', resp['status']):
874 logging.debug("KeepWriterThread %s succeeded %s %s" %
875 (str(threading.current_thread()),
876 self.args['data_hash'],
877 self.args['service_root']))
878 return limiter.increment_done()
879 logging.warning("Request fail: PUT %s => %s %s" %
880 (url, resp['status'], content))
881 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
882 logging.warning("Request fail: PUT %s => %s: %s" %
883 (url, type(e), str(e)))
886 self.lock = threading.Lock()
887 self.service_roots = None
889 def shuffled_service_roots(self, hash):
890 if self.service_roots == None:
892 keep_disks = api().keep_disks().list().execute()['items']
893 roots = (("http%s://%s:%d/" %
894 ('s' if f['service_ssl_flag'] else '',
898 self.service_roots = sorted(set(roots))
899 logging.debug(str(self.service_roots))
902 pool = self.service_roots[:]
906 if len(pseq) < len(hash) / 4: # first time around
907 seed = hash[-4:] + hash
910 probe = int(seed[0:8], 16) % len(pool)
911 pseq += [pool[probe]]
912 pool = pool[:probe] + pool[probe+1:]
914 logging.debug(str(pseq))
917 def get(self, locator):
918 if 'KEEP_LOCAL_STORE' in os.environ:
919 return KeepClient.local_store_get(locator)
920 expect_hash = re.sub(r'\+.*', '', locator)
921 for service_root in self.shuffled_service_roots(expect_hash):
923 url = service_root + expect_hash
924 api_token = os.environ['ARVADOS_API_TOKEN']
925 headers = {'Authorization': "OAuth2 %s" % api_token,
926 'Accept': 'application/octet-stream'}
928 resp, content = h.request(url.encode('utf-8'), 'GET',
930 if re.match(r'^2\d\d$', resp['status']):
931 m = hashlib.new('md5')
934 if md5 == expect_hash:
936 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
937 except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
938 logging.info("Request fail: GET %s => %s: %s" %
939 (url, type(e), str(e)))
940 raise Exception("Not found: %s" % expect_hash)
942 def put(self, data, **kwargs):
943 if 'KEEP_LOCAL_STORE' in os.environ:
944 return KeepClient.local_store_put(data)
945 m = hashlib.new('md5')
947 data_hash = m.hexdigest()
949 want_copies = kwargs.get('copies', 2)
950 if not (want_copies > 0):
953 thread_limiter = KeepClient.ThreadLimiter(want_copies)
954 for service_root in self.shuffled_service_roots(data_hash):
955 t = KeepClient.KeepWriterThread(data=data,
957 service_root=service_root,
958 thread_limiter=thread_limiter)
963 have_copies = thread_limiter.done()
964 if have_copies == want_copies:
965 return (data_hash + '+' + str(len(data)))
966 raise Exception("Write fail for %s: wanted %d but wrote %d" %
967 (data_hash, want_copies, have_copies))
970 def sign_for_old_server(data_hash, data):
971 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
975 def local_store_put(data):
976 m = hashlib.new('md5')
979 locator = '%s+%d' % (md5, len(data))
980 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
982 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
983 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
986 def local_store_get(locator):
987 r = re.search('^([0-9a-f]{32,})', locator)
989 raise Exception("Keep.get: invalid data locator '%s'" % locator)
990 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
992 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: