20 from apiclient import errors
21 from apiclient.discovery import build
23 class CredentialsFromEnv:
25 def http_request(self, uri, **kwargs):
26 from httplib import BadStatusLine
27 if 'headers' not in kwargs:
28 kwargs['headers'] = {}
29 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
31 return self.orig_http_request(uri, **kwargs)
33 # This is how httplib tells us that it tried to reuse an
34 # existing connection but it was already closed by the
35 # server. In that case, yes, we would like to retry.
36 # Unfortunately, we are not absolutely certain that the
37 # previous call did not succeed, so this is slightly
39 return self.orig_http_request(uri, **kwargs)
40 def authorize(self, http):
41 http.orig_http_request = http.request
42 http.request = types.MethodType(self.http_request, http)
45 url = ('https://%s/discovery/v1/apis/'
46 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
47 credentials = CredentialsFromEnv()
49 # Use system's CA certificates (if we find them) instead of httplib2's
50 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
51 if not os.path.exists(ca_certs):
52 ca_certs = None # use httplib2 default
54 http = httplib2.Http(ca_certs=ca_certs)
55 http = credentials.authorize(http)
56 if re.match(r'(?i)^(true|1|yes)$',
57 os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
58 http.disable_ssl_certificate_validation=True
59 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
61 def task_set_output(self,s):
62 service.job_tasks().update(uuid=self['uuid'],
74 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
75 t = UserDict.UserDict(t)
76 t.set_output = types.MethodType(task_set_output, t)
77 t.tmpdir = os.environ['TASK_WORK']
86 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
87 t = UserDict.UserDict(t)
88 t.tmpdir = os.environ['JOB_WORK']
96 def __init__(self, parameters=dict(), runtime_constraints=dict()):
97 print "init jobtask %s %s" % (parameters, runtime_constraints)
101 def one_task_per_input_file(if_sequence=0, and_end_task=True):
102 if if_sequence != current_task()['sequence']:
104 job_input = current_job()['script_parameters']['input']
105 cr = CollectionReader(job_input)
106 for s in cr.all_streams():
107 for f in s.all_files():
108 task_input = f.as_manifest()
110 'job_uuid': current_job()['uuid'],
111 'created_by_job_task_uuid': current_task()['uuid'],
112 'sequence': if_sequence + 1,
117 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
119 service.job_tasks().update(uuid=current_task()['uuid'],
120 job_task=json.dumps({'success':True})
125 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
126 if if_sequence != current_task()['sequence']:
128 job_input = current_job()['script_parameters']['input']
129 cr = CollectionReader(job_input)
130 for s in cr.all_streams():
131 task_input = s.tokens()
133 'job_uuid': current_job()['uuid'],
134 'created_by_job_task_uuid': current_task()['uuid'],
135 'sequence': if_sequence + 1,
140 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
142 service.job_tasks().update(uuid=current_task()['uuid'],
143 job_task=json.dumps({'success':True})
149 def run_command(execargs, **kwargs):
150 if 'stdin' not in kwargs:
151 kwargs['stdin'] = subprocess.PIPE
152 if 'stdout' not in kwargs:
153 kwargs['stdout'] = subprocess.PIPE
154 if 'stderr' not in kwargs:
155 kwargs['stderr'] = subprocess.PIPE
156 p = subprocess.Popen(execargs, close_fds=True, shell=False,
158 stdoutdata, stderrdata = p.communicate(None)
159 if p.returncode != 0:
160 raise Exception("run_command %s exit %d:\n%s" %
161 (execargs, p.returncode, stderrdata))
162 return stdoutdata, stderrdata
165 def git_checkout(url, version, path):
166 if not re.search('^/', path):
167 path = os.path.join(current_job().tmpdir, path)
168 if not os.path.exists(path):
169 util.run_command(["git", "clone", url, path],
170 cwd=os.path.dirname(path))
171 util.run_command(["git", "checkout", version],
176 def tar_extractor(path, decompress_flag):
177 return subprocess.Popen(["tar",
179 ("-x%sf" % decompress_flag),
182 stdin=subprocess.PIPE, stderr=sys.stderr,
183 shell=False, close_fds=True)
186 def tarball_extract(tarball, path):
187 """Retrieve a tarball from Keep and extract it to a local
188 directory. Return the absolute path where the tarball was
189 extracted. If the top level of the tarball contained just one
190 file or directory, return the absolute path of that single
193 tarball -- collection locator
194 path -- where to extract the tarball: absolute, or relative to job tmp
196 if not re.search('^/', path):
197 path = os.path.join(current_job().tmpdir, path)
198 lockfile = open(path + '.lock', 'w')
199 fcntl.flock(lockfile, fcntl.LOCK_EX)
204 already_have_it = False
206 if os.readlink(os.path.join(path, '.locator')) == tarball:
207 already_have_it = True
210 if not already_have_it:
212 # emulate "rm -f" (i.e., if the file does not exist, we win)
214 os.unlink(os.path.join(path, '.locator'))
216 if os.path.exists(os.path.join(path, '.locator')):
217 os.unlink(os.path.join(path, '.locator'))
219 for f in CollectionReader(tarball).all_files():
220 if re.search('\.(tbz|tar.bz2)$', f.name()):
221 p = util.tar_extractor(path, 'j')
222 elif re.search('\.(tgz|tar.gz)$', f.name()):
223 p = util.tar_extractor(path, 'z')
224 elif re.search('\.tar$', f.name()):
225 p = util.tar_extractor(path, '')
227 raise Exception("tarball_extract cannot handle filename %s"
236 if p.returncode != 0:
238 raise Exception("tar exited %d" % p.returncode)
239 os.symlink(tarball, os.path.join(path, '.locator'))
240 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
242 if len(tld_extracts) == 1:
243 return os.path.join(path, tld_extracts[0])
247 def zipball_extract(zipball, path):
248 """Retrieve a zip archive from Keep and extract it to a local
249 directory. Return the absolute path where the archive was
250 extracted. If the top level of the archive contained just one
251 file or directory, return the absolute path of that single
254 zipball -- collection locator
255 path -- where to extract the archive: absolute, or relative to job tmp
257 if not re.search('^/', path):
258 path = os.path.join(current_job().tmpdir, path)
259 lockfile = open(path + '.lock', 'w')
260 fcntl.flock(lockfile, fcntl.LOCK_EX)
265 already_have_it = False
267 if os.readlink(os.path.join(path, '.locator')) == zipball:
268 already_have_it = True
271 if not already_have_it:
273 # emulate "rm -f" (i.e., if the file does not exist, we win)
275 os.unlink(os.path.join(path, '.locator'))
277 if os.path.exists(os.path.join(path, '.locator')):
278 os.unlink(os.path.join(path, '.locator'))
280 for f in CollectionReader(zipball).all_files():
281 if not re.search('\.zip$', f.name()):
282 raise Exception("zipball_extract cannot handle filename %s"
284 zip_filename = os.path.join(path, os.path.basename(f.name()))
285 zip_file = open(zip_filename, 'wb')
293 p = subprocess.Popen(["unzip",
298 stdin=None, stderr=sys.stderr,
299 shell=False, close_fds=True)
301 if p.returncode != 0:
303 raise Exception("unzip exited %d" % p.returncode)
304 os.unlink(zip_filename)
305 os.symlink(zipball, os.path.join(path, '.locator'))
306 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
308 if len(tld_extracts) == 1:
309 return os.path.join(path, tld_extracts[0])
313 def collection_extract(collection, path, files=[], decompress=True):
314 """Retrieve a collection from Keep and extract it to a local
315 directory. Return the absolute path where the collection was
318 collection -- collection locator
319 path -- where to extract: absolute, or relative to job tmp
321 if not re.search('^/', path):
322 path = os.path.join(current_job().tmpdir, path)
323 lockfile = open(path + '.lock', 'w')
324 fcntl.flock(lockfile, fcntl.LOCK_EX)
329 already_have_it = False
331 if os.readlink(os.path.join(path, '.locator')) == collection:
332 already_have_it = True
336 # emulate "rm -f" (i.e., if the file does not exist, we win)
338 os.unlink(os.path.join(path, '.locator'))
340 if os.path.exists(os.path.join(path, '.locator')):
341 os.unlink(os.path.join(path, '.locator'))
344 for s in CollectionReader(collection).all_streams():
345 stream_name = s.name()
346 for f in s.all_files():
348 ((f.name() not in files_got) and
349 (f.name() in files or
350 (decompress and f.decompressed_name() in files)))):
351 outname = f.decompressed_name() if decompress else f.name()
352 files_got += [outname]
353 if os.path.exists(os.path.join(path, stream_name, outname)):
355 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
356 outfile = open(os.path.join(path, stream_name, outname), 'wb')
357 for buf in (f.readall_decompressed() if decompress
361 if len(files_got) < len(files):
362 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
363 os.symlink(collection, os.path.join(path, '.locator'))
369 def mkdir_dash_p(path):
370 if not os.path.exists(path):
371 util.mkdir_dash_p(os.path.dirname(path))
375 if not os.path.exists(path):
379 def stream_extract(stream, path, files=[], decompress=True):
380 """Retrieve a stream from Keep and extract it to a local
381 directory. Return the absolute path where the stream was
384 stream -- StreamReader object
385 path -- where to extract: absolute, or relative to job tmp
387 if not re.search('^/', path):
388 path = os.path.join(current_job().tmpdir, path)
389 lockfile = open(path + '.lock', 'w')
390 fcntl.flock(lockfile, fcntl.LOCK_EX)
397 for f in stream.all_files():
399 ((f.name() not in files_got) and
400 (f.name() in files or
401 (decompress and f.decompressed_name() in files)))):
402 outname = f.decompressed_name() if decompress else f.name()
403 files_got += [outname]
404 if os.path.exists(os.path.join(path, outname)):
405 os.unlink(os.path.join(path, outname))
406 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
407 outfile = open(os.path.join(path, outname), 'wb')
408 for buf in (f.readall_decompressed() if decompress
412 if len(files_got) < len(files):
413 raise Exception("Wanted files %s but only got %s from %s" %
414 (files, files_got, map(lambda z: z.name(),
415 list(stream.all_files()))))
420 def listdir_recursive(dirname, base=None):
422 for ent in sorted(os.listdir(dirname)):
423 ent_path = os.path.join(dirname, ent)
424 ent_base = os.path.join(base, ent) if base else ent
425 if os.path.isdir(ent_path):
426 allfiles += util.listdir_recursive(ent_path, ent_base)
428 allfiles += [ent_base]
432 def __init__(self, data_locator):
433 self.data_locator = data_locator
434 self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
435 stdout=subprocess.PIPE,
436 stdin=None, stderr=subprocess.PIPE,
437 shell=False, close_fds=True)
442 def read(self, size, **kwargs):
443 return self.p.stdout.read(size, **kwargs)
445 self.p.stdout.close()
446 if not self.p.stderr.closed:
447 for err in self.p.stderr:
448 print >> sys.stderr, err
449 self.p.stderr.close()
451 if self.p.returncode != 0:
452 raise Exception("whget subprocess exited %d" % self.p.returncode)
454 class StreamFileReader:
455 def __init__(self, stream, pos, size, name):
456 self._stream = stream
463 def decompressed_name(self):
464 return re.sub('\.(bz2|gz)$', '', self._name)
467 def stream_name(self):
468 return self._stream.name()
469 def read(self, size, **kwargs):
470 self._stream.seek(self._pos + self._filepos)
471 data = self._stream.read(min(size, self._size - self._filepos))
472 self._filepos += len(data)
474 def readall(self, size=2**20, **kwargs):
476 data = self.read(size, **kwargs)
480 def bunzip2(self, size):
481 decompressor = bz2.BZ2Decompressor()
482 for chunk in self.readall(size):
483 data = decompressor.decompress(chunk)
484 if data and data != '':
486 def gunzip(self, size):
487 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
488 for chunk in self.readall(size):
489 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
490 if data and data != '':
492 def readall_decompressed(self, size=2**20):
493 self._stream.seek(self._pos + self._filepos)
494 if re.search('\.bz2$', self._name):
495 return self.bunzip2(size)
496 elif re.search('\.gz$', self._name):
497 return self.gunzip(size)
499 return self.readall(size)
500 def readlines(self, decompress=True):
502 datasource = self.readall_decompressed()
504 self._stream.seek(self._pos + self._filepos)
505 datasource = self.readall()
507 for newdata in datasource:
511 eol = string.find(data, "\n", sol)
514 yield data[sol:eol+1]
519 def as_manifest(self):
521 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
522 % (self._stream.name(), self.name()))
523 return string.join(self._stream.tokens_for_range(self._pos, self._size),
527 def __init__(self, tokens):
528 self._tokens = tokens
529 self._current_datablock_data = None
530 self._current_datablock_pos = 0
531 self._current_datablock_index = -1
534 self._stream_name = None
535 self.data_locators = []
538 for tok in self._tokens:
539 if self._stream_name == None:
540 self._stream_name = tok
541 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
542 self.data_locators += [tok]
543 elif re.search(r'^\d+:\d+:\S+', tok):
544 pos, size, name = tok.split(':',2)
545 self.files += [[int(pos), int(size), name]]
547 raise Exception("Invalid manifest format")
551 def tokens_for_range(self, range_start, range_size):
552 resp = [self._stream_name]
553 return_all_tokens = False
555 token_bytes_skipped = 0
556 for locator in self.data_locators:
557 sizehint = re.search(r'\+(\d+)', locator)
559 return_all_tokens = True
560 if return_all_tokens:
563 blocksize = int(sizehint.group(0))
564 if range_start + range_size <= block_start:
566 if range_start < block_start + blocksize:
569 token_bytes_skipped += blocksize
570 block_start += blocksize
572 if ((f[0] < range_start + range_size)
574 (f[0] + f[1] > range_start)
577 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
580 return self._stream_name
584 yield StreamFileReader(self, pos, size, name)
585 def nextdatablock(self):
586 if self._current_datablock_index < 0:
587 self._current_datablock_pos = 0
588 self._current_datablock_index = 0
590 self._current_datablock_pos += self.current_datablock_size()
591 self._current_datablock_index += 1
592 self._current_datablock_data = None
593 def current_datablock_data(self):
594 if self._current_datablock_data == None:
595 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
596 return self._current_datablock_data
597 def current_datablock_size(self):
598 if self._current_datablock_index < 0:
600 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
602 return int(sizehint.group(0))
603 return len(self.current_datablock_data())
605 """Set the position of the next read operation."""
607 def really_seek(self):
608 """Find and load the appropriate data block, so the byte at
611 if self._pos == self._current_datablock_pos:
613 if (self._current_datablock_pos != None and
614 self._pos >= self._current_datablock_pos and
615 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
617 if self._pos < self._current_datablock_pos:
618 self._current_datablock_index = -1
620 while (self._pos > self._current_datablock_pos and
621 self._pos > self._current_datablock_pos + self.current_datablock_size()):
623 def read(self, size):
624 """Read no more than size bytes -- but at least one byte,
625 unless _pos is already at the end of the stream.
630 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
632 if self._current_datablock_index >= len(self.data_locators):
634 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
635 self._pos += len(data)
638 class CollectionReader:
639 def __init__(self, manifest_locator_or_text):
640 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
641 self._manifest_text = manifest_locator_or_text
642 self._manifest_locator = None
644 self._manifest_locator = manifest_locator_or_text
645 self._manifest_text = None
652 if self._streams != None:
654 if not self._manifest_text:
655 self._manifest_text = Keep.get(self._manifest_locator)
657 for stream_line in self._manifest_text.split("\n"):
658 if stream_line != '':
659 stream_tokens = stream_line.split()
660 self._streams += [stream_tokens]
661 def all_streams(self):
664 for s in self._streams:
665 resp += [StreamReader(s)]
668 for s in self.all_streams():
669 for f in s.all_files():
671 def manifest_text(self):
673 return self._manifest_text
675 class CollectionWriter:
676 KEEP_BLOCK_SIZE = 2**26
678 self._data_buffer = []
679 self._data_buffer_len = 0
680 self._current_stream_files = []
681 self._current_stream_length = 0
682 self._current_stream_locators = []
683 self._current_stream_name = '.'
684 self._current_file_name = None
685 self._current_file_pos = 0
686 self._finished_streams = []
691 def write_directory_tree(self,
692 path, stream_name='.', max_manifest_depth=-1):
693 self.start_new_stream(stream_name)
695 if max_manifest_depth == 0:
696 dirents = util.listdir_recursive(path)
698 dirents = sorted(os.listdir(path))
699 for dirent in dirents:
700 target = os.path.join(path, dirent)
701 if os.path.isdir(target):
703 os.path.join(stream_name, dirent),
704 max_manifest_depth-1]]
706 self.start_new_file(dirent)
707 with open(target, 'rb') as f:
713 self.finish_current_stream()
714 map(lambda x: self.write_directory_tree(*x), todo)
716 def write(self, newdata):
717 self._data_buffer += [newdata]
718 self._data_buffer_len += len(newdata)
719 self._current_stream_length += len(newdata)
720 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
722 def flush_data(self):
723 data_buffer = ''.join(self._data_buffer)
724 if data_buffer != '':
725 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
726 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
727 self._data_buffer_len = len(self._data_buffer[0])
728 def start_new_file(self, newfilename=None):
729 self.finish_current_file()
730 self.set_current_file_name(newfilename)
731 def set_current_file_name(self, newfilename):
732 newfilename = re.sub(r' ', '\\\\040', newfilename)
733 if re.search(r'[ \t\n]', newfilename):
734 raise AssertionError("Manifest filenames cannot contain whitespace")
735 self._current_file_name = newfilename
736 def current_file_name(self):
737 return self._current_file_name
738 def finish_current_file(self):
739 if self._current_file_name == None:
740 if self._current_file_pos == self._current_stream_length:
742 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
743 self._current_stream_files += [[self._current_file_pos,
744 self._current_stream_length - self._current_file_pos,
745 self._current_file_name]]
746 self._current_file_pos = self._current_stream_length
747 def start_new_stream(self, newstreamname='.'):
748 self.finish_current_stream()
749 self.set_current_stream_name(newstreamname)
750 def set_current_stream_name(self, newstreamname):
751 if re.search(r'[ \t\n]', newstreamname):
752 raise AssertionError("Manifest stream names cannot contain whitespace")
753 self._current_stream_name = newstreamname
754 def current_stream_name(self):
755 return self._current_stream_name
756 def finish_current_stream(self):
757 self.finish_current_file()
759 if len(self._current_stream_files) == 0:
761 elif self._current_stream_name == None:
762 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
764 self._finished_streams += [[self._current_stream_name,
765 self._current_stream_locators,
766 self._current_stream_files]]
767 self._current_stream_files = []
768 self._current_stream_length = 0
769 self._current_stream_locators = []
770 self._current_stream_name = None
771 self._current_file_pos = 0
772 self._current_file_name = None
774 return Keep.put(self.manifest_text())
775 def manifest_text(self):
776 self.finish_current_stream()
778 for stream in self._finished_streams:
779 if not re.search(r'^\.(/.*)?$', stream[0]):
781 manifest += stream[0]
782 if len(stream[1]) == 0:
783 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
785 for locator in stream[1]:
786 manifest += " %s" % locator
787 for sfile in stream[2]:
788 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
792 global_client_object = None
796 def global_client_object():
797 global global_client_object
798 if global_client_object == None:
799 global_client_object = KeepClient()
800 return global_client_object
804 return Keep.global_client_object().get(locator)
808 return Keep.global_client_object().put(data)
812 self.service_roots = None
814 def shuffled_service_roots(self, hash):
815 if self.service_roots == None:
816 keep_disks = api().keep_disks().list().execute()['items']
817 roots = (("http%s://%s:%d/" %
818 ('s' if f['service_ssl_flag'] else '',
822 self.service_roots = sorted(set(roots))
823 logging.debug(str(self.service_roots))
825 pool = self.service_roots[:]
829 if len(pseq) < len(hash) / 4: # first time around
830 seed = hash[-4:] + hash
833 probe = int(seed[0:8], 16) % len(pool)
834 pseq += [pool[probe]]
835 pool = pool[:probe] + pool[probe+1:]
837 logging.debug(str(pseq))
840 def get(self, locator):
841 if 'KEEP_LOCAL_STORE' in os.environ:
842 return KeepClient.local_store_get(locator)
843 expect_hash = re.sub(r'\+.*', '', locator)
844 for service_root in self.shuffled_service_roots(expect_hash):
846 url = service_root + expect_hash
847 api_token = os.environ['ARVADOS_API_TOKEN']
848 headers = {'Authorization': "OAuth2 %s" % api_token,
849 'Accept': 'application/octet-stream'}
851 resp, content = h.request(url, 'GET', headers=headers)
852 if re.match(r'^2\d\d$', resp['status']):
853 m = hashlib.new('md5')
856 if md5 == expect_hash:
858 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
859 except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
860 logging.info("Request fail: GET %s => %s: %s" %
861 (url, type(e), str(e)))
862 raise Exception("Not found: %s" % expect_hash)
864 def put(self, data, **kwargs):
865 if 'KEEP_LOCAL_STORE' in os.environ:
866 return KeepClient.local_store_put(data)
867 m = hashlib.new('md5')
869 data_hash = m.hexdigest()
871 want_copies = kwargs.get('copies', 2)
872 for service_root in self.shuffled_service_roots(data_hash):
874 url = service_root + data_hash
875 api_token = os.environ['ARVADOS_API_TOKEN']
876 headers = {'Authorization': "OAuth2 %s" % api_token}
878 resp, content = h.request(url, 'PUT',
881 if (resp['status'] == '401' and
882 re.match(r'Timestamp verification failed', content)):
883 body = self.sign_for_old_server(data_hash, data)
885 resp, content = h.request(url, 'PUT',
888 if re.match(r'^2\d\d$', resp['status']):
890 if have_copies == want_copies:
891 return data_hash + '+' + str(len(data))
893 logging.warning("Request fail: PUT %s => %s %s" %
894 (url, resp['status'], content))
895 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
896 logging.warning("Request fail: PUT %s => %s: %s" %
897 (url, type(e), str(e)))
898 raise Exception("Write fail for %s: wanted %d but wrote %d" %
899 (data_hash, want_copies, have_copies))
901 def sign_for_old_server(self, data_hash, data):
902 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
906 def local_store_put(data):
907 m = hashlib.new('md5')
910 locator = '%s+%d' % (md5, len(data))
911 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
913 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
914 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
917 def local_store_get(locator):
918 r = re.search('^([0-9a-f]{32,})', locator)
920 raise Exception("Keep.get: invalid data locator '%s'" % locator)
921 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
923 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: