20 from apiclient import errors
21 from apiclient.discovery import build
23 if 'ARVADOS_DEBUG' in os.environ:
24 logging.basicConfig(level=logging.DEBUG)
26 class CredentialsFromEnv:
28 def http_request(self, uri, **kwargs):
29 from httplib import BadStatusLine
30 if 'headers' not in kwargs:
31 kwargs['headers'] = {}
32 kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
34 return self.orig_http_request(uri, **kwargs)
36 # This is how httplib tells us that it tried to reuse an
37 # existing connection but it was already closed by the
38 # server. In that case, yes, we would like to retry.
39 # Unfortunately, we are not absolutely certain that the
40 # previous call did not succeed, so this is slightly
42 return self.orig_http_request(uri, **kwargs)
43 def authorize(self, http):
44 http.orig_http_request = http.request
45 http.request = types.MethodType(self.http_request, http)
48 url = ('https://%s/discovery/v1/apis/'
49 '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
50 credentials = CredentialsFromEnv()
52 # Use system's CA certificates (if we find them) instead of httplib2's
53 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
54 if not os.path.exists(ca_certs):
55 ca_certs = None # use httplib2 default
57 http = httplib2.Http(ca_certs=ca_certs)
58 http = credentials.authorize(http)
59 if re.match(r'(?i)^(true|1|yes)$',
60 os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
61 http.disable_ssl_certificate_validation=True
62 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
64 def task_set_output(self,s):
65 service.job_tasks().update(uuid=self['uuid'],
77 t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
78 t = UserDict.UserDict(t)
79 t.set_output = types.MethodType(task_set_output, t)
80 t.tmpdir = os.environ['TASK_WORK']
89 t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
90 t = UserDict.UserDict(t)
91 t.tmpdir = os.environ['JOB_WORK']
99 def __init__(self, parameters=dict(), runtime_constraints=dict()):
100 print "init jobtask %s %s" % (parameters, runtime_constraints)
104 def one_task_per_input_file(if_sequence=0, and_end_task=True):
105 if if_sequence != current_task()['sequence']:
107 job_input = current_job()['script_parameters']['input']
108 cr = CollectionReader(job_input)
109 for s in cr.all_streams():
110 for f in s.all_files():
111 task_input = f.as_manifest()
113 'job_uuid': current_job()['uuid'],
114 'created_by_job_task_uuid': current_task()['uuid'],
115 'sequence': if_sequence + 1,
120 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
122 service.job_tasks().update(uuid=current_task()['uuid'],
123 job_task=json.dumps({'success':True})
128 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
129 if if_sequence != current_task()['sequence']:
131 job_input = current_job()['script_parameters']['input']
132 cr = CollectionReader(job_input)
133 for s in cr.all_streams():
134 task_input = s.tokens()
136 'job_uuid': current_job()['uuid'],
137 'created_by_job_task_uuid': current_task()['uuid'],
138 'sequence': if_sequence + 1,
143 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
145 service.job_tasks().update(uuid=current_task()['uuid'],
146 job_task=json.dumps({'success':True})
152 def run_command(execargs, **kwargs):
153 if 'stdin' not in kwargs:
154 kwargs['stdin'] = subprocess.PIPE
155 if 'stdout' not in kwargs:
156 kwargs['stdout'] = subprocess.PIPE
157 if 'stderr' not in kwargs:
158 kwargs['stderr'] = subprocess.PIPE
159 p = subprocess.Popen(execargs, close_fds=True, shell=False,
161 stdoutdata, stderrdata = p.communicate(None)
162 if p.returncode != 0:
163 raise Exception("run_command %s exit %d:\n%s" %
164 (execargs, p.returncode, stderrdata))
165 return stdoutdata, stderrdata
168 def git_checkout(url, version, path):
169 if not re.search('^/', path):
170 path = os.path.join(current_job().tmpdir, path)
171 if not os.path.exists(path):
172 util.run_command(["git", "clone", url, path],
173 cwd=os.path.dirname(path))
174 util.run_command(["git", "checkout", version],
179 def tar_extractor(path, decompress_flag):
180 return subprocess.Popen(["tar",
182 ("-x%sf" % decompress_flag),
185 stdin=subprocess.PIPE, stderr=sys.stderr,
186 shell=False, close_fds=True)
189 def tarball_extract(tarball, path):
190 """Retrieve a tarball from Keep and extract it to a local
191 directory. Return the absolute path where the tarball was
192 extracted. If the top level of the tarball contained just one
193 file or directory, return the absolute path of that single
196 tarball -- collection locator
197 path -- where to extract the tarball: absolute, or relative to job tmp
199 if not re.search('^/', path):
200 path = os.path.join(current_job().tmpdir, path)
201 lockfile = open(path + '.lock', 'w')
202 fcntl.flock(lockfile, fcntl.LOCK_EX)
207 already_have_it = False
209 if os.readlink(os.path.join(path, '.locator')) == tarball:
210 already_have_it = True
213 if not already_have_it:
215 # emulate "rm -f" (i.e., if the file does not exist, we win)
217 os.unlink(os.path.join(path, '.locator'))
219 if os.path.exists(os.path.join(path, '.locator')):
220 os.unlink(os.path.join(path, '.locator'))
222 for f in CollectionReader(tarball).all_files():
223 if re.search('\.(tbz|tar.bz2)$', f.name()):
224 p = util.tar_extractor(path, 'j')
225 elif re.search('\.(tgz|tar.gz)$', f.name()):
226 p = util.tar_extractor(path, 'z')
227 elif re.search('\.tar$', f.name()):
228 p = util.tar_extractor(path, '')
230 raise Exception("tarball_extract cannot handle filename %s"
239 if p.returncode != 0:
241 raise Exception("tar exited %d" % p.returncode)
242 os.symlink(tarball, os.path.join(path, '.locator'))
243 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
245 if len(tld_extracts) == 1:
246 return os.path.join(path, tld_extracts[0])
250 def zipball_extract(zipball, path):
251 """Retrieve a zip archive from Keep and extract it to a local
252 directory. Return the absolute path where the archive was
253 extracted. If the top level of the archive contained just one
254 file or directory, return the absolute path of that single
257 zipball -- collection locator
258 path -- where to extract the archive: absolute, or relative to job tmp
260 if not re.search('^/', path):
261 path = os.path.join(current_job().tmpdir, path)
262 lockfile = open(path + '.lock', 'w')
263 fcntl.flock(lockfile, fcntl.LOCK_EX)
268 already_have_it = False
270 if os.readlink(os.path.join(path, '.locator')) == zipball:
271 already_have_it = True
274 if not already_have_it:
276 # emulate "rm -f" (i.e., if the file does not exist, we win)
278 os.unlink(os.path.join(path, '.locator'))
280 if os.path.exists(os.path.join(path, '.locator')):
281 os.unlink(os.path.join(path, '.locator'))
283 for f in CollectionReader(zipball).all_files():
284 if not re.search('\.zip$', f.name()):
285 raise Exception("zipball_extract cannot handle filename %s"
287 zip_filename = os.path.join(path, os.path.basename(f.name()))
288 zip_file = open(zip_filename, 'wb')
296 p = subprocess.Popen(["unzip",
301 stdin=None, stderr=sys.stderr,
302 shell=False, close_fds=True)
304 if p.returncode != 0:
306 raise Exception("unzip exited %d" % p.returncode)
307 os.unlink(zip_filename)
308 os.symlink(zipball, os.path.join(path, '.locator'))
309 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
311 if len(tld_extracts) == 1:
312 return os.path.join(path, tld_extracts[0])
316 def collection_extract(collection, path, files=[], decompress=True):
317 """Retrieve a collection from Keep and extract it to a local
318 directory. Return the absolute path where the collection was
321 collection -- collection locator
322 path -- where to extract: absolute, or relative to job tmp
324 if not re.search('^/', path):
325 path = os.path.join(current_job().tmpdir, path)
326 lockfile = open(path + '.lock', 'w')
327 fcntl.flock(lockfile, fcntl.LOCK_EX)
332 already_have_it = False
334 if os.readlink(os.path.join(path, '.locator')) == collection:
335 already_have_it = True
339 # emulate "rm -f" (i.e., if the file does not exist, we win)
341 os.unlink(os.path.join(path, '.locator'))
343 if os.path.exists(os.path.join(path, '.locator')):
344 os.unlink(os.path.join(path, '.locator'))
347 for s in CollectionReader(collection).all_streams():
348 stream_name = s.name()
349 for f in s.all_files():
351 ((f.name() not in files_got) and
352 (f.name() in files or
353 (decompress and f.decompressed_name() in files)))):
354 outname = f.decompressed_name() if decompress else f.name()
355 files_got += [outname]
356 if os.path.exists(os.path.join(path, stream_name, outname)):
358 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
359 outfile = open(os.path.join(path, stream_name, outname), 'wb')
360 for buf in (f.readall_decompressed() if decompress
364 if len(files_got) < len(files):
365 raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
366 os.symlink(collection, os.path.join(path, '.locator'))
372 def mkdir_dash_p(path):
373 if not os.path.exists(path):
374 util.mkdir_dash_p(os.path.dirname(path))
378 if not os.path.exists(path):
382 def stream_extract(stream, path, files=[], decompress=True):
383 """Retrieve a stream from Keep and extract it to a local
384 directory. Return the absolute path where the stream was
387 stream -- StreamReader object
388 path -- where to extract: absolute, or relative to job tmp
390 if not re.search('^/', path):
391 path = os.path.join(current_job().tmpdir, path)
392 lockfile = open(path + '.lock', 'w')
393 fcntl.flock(lockfile, fcntl.LOCK_EX)
400 for f in stream.all_files():
402 ((f.name() not in files_got) and
403 (f.name() in files or
404 (decompress and f.decompressed_name() in files)))):
405 outname = f.decompressed_name() if decompress else f.name()
406 files_got += [outname]
407 if os.path.exists(os.path.join(path, outname)):
408 os.unlink(os.path.join(path, outname))
409 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
410 outfile = open(os.path.join(path, outname), 'wb')
411 for buf in (f.readall_decompressed() if decompress
415 if len(files_got) < len(files):
416 raise Exception("Wanted files %s but only got %s from %s" %
417 (files, files_got, map(lambda z: z.name(),
418 list(stream.all_files()))))
423 def listdir_recursive(dirname, base=None):
425 for ent in sorted(os.listdir(dirname)):
426 ent_path = os.path.join(dirname, ent)
427 ent_base = os.path.join(base, ent) if base else ent
428 if os.path.isdir(ent_path):
429 allfiles += util.listdir_recursive(ent_path, ent_base)
431 allfiles += [ent_base]
434 class StreamFileReader:
435 def __init__(self, stream, pos, size, name):
436 self._stream = stream
443 def decompressed_name(self):
444 return re.sub('\.(bz2|gz)$', '', self._name)
447 def stream_name(self):
448 return self._stream.name()
449 def read(self, size, **kwargs):
450 self._stream.seek(self._pos + self._filepos)
451 data = self._stream.read(min(size, self._size - self._filepos))
452 self._filepos += len(data)
454 def readall(self, size=2**20, **kwargs):
456 data = self.read(size, **kwargs)
460 def bunzip2(self, size):
461 decompressor = bz2.BZ2Decompressor()
462 for chunk in self.readall(size):
463 data = decompressor.decompress(chunk)
464 if data and data != '':
466 def gunzip(self, size):
467 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
468 for chunk in self.readall(size):
469 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
470 if data and data != '':
472 def readall_decompressed(self, size=2**20):
473 self._stream.seek(self._pos + self._filepos)
474 if re.search('\.bz2$', self._name):
475 return self.bunzip2(size)
476 elif re.search('\.gz$', self._name):
477 return self.gunzip(size)
479 return self.readall(size)
480 def readlines(self, decompress=True):
482 datasource = self.readall_decompressed()
484 self._stream.seek(self._pos + self._filepos)
485 datasource = self.readall()
487 for newdata in datasource:
491 eol = string.find(data, "\n", sol)
494 yield data[sol:eol+1]
499 def as_manifest(self):
501 return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
502 % (self._stream.name(), self.name()))
503 return string.join(self._stream.tokens_for_range(self._pos, self._size),
507 def __init__(self, tokens):
508 self._tokens = tokens
509 self._current_datablock_data = None
510 self._current_datablock_pos = 0
511 self._current_datablock_index = -1
514 self._stream_name = None
515 self.data_locators = []
518 for tok in self._tokens:
519 if self._stream_name == None:
520 self._stream_name = tok
521 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
522 self.data_locators += [tok]
523 elif re.search(r'^\d+:\d+:\S+', tok):
524 pos, size, name = tok.split(':',2)
525 self.files += [[int(pos), int(size), name]]
527 raise Exception("Invalid manifest format")
531 def tokens_for_range(self, range_start, range_size):
532 resp = [self._stream_name]
533 return_all_tokens = False
535 token_bytes_skipped = 0
536 for locator in self.data_locators:
537 sizehint = re.search(r'\+(\d+)', locator)
539 return_all_tokens = True
540 if return_all_tokens:
543 blocksize = int(sizehint.group(0))
544 if range_start + range_size <= block_start:
546 if range_start < block_start + blocksize:
549 token_bytes_skipped += blocksize
550 block_start += blocksize
552 if ((f[0] < range_start + range_size)
554 (f[0] + f[1] > range_start)
557 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
560 return self._stream_name
564 yield StreamFileReader(self, pos, size, name)
565 def nextdatablock(self):
566 if self._current_datablock_index < 0:
567 self._current_datablock_pos = 0
568 self._current_datablock_index = 0
570 self._current_datablock_pos += self.current_datablock_size()
571 self._current_datablock_index += 1
572 self._current_datablock_data = None
573 def current_datablock_data(self):
574 if self._current_datablock_data == None:
575 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
576 return self._current_datablock_data
577 def current_datablock_size(self):
578 if self._current_datablock_index < 0:
580 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
582 return int(sizehint.group(0))
583 return len(self.current_datablock_data())
585 """Set the position of the next read operation."""
587 def really_seek(self):
588 """Find and load the appropriate data block, so the byte at
591 if self._pos == self._current_datablock_pos:
593 if (self._current_datablock_pos != None and
594 self._pos >= self._current_datablock_pos and
595 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
597 if self._pos < self._current_datablock_pos:
598 self._current_datablock_index = -1
600 while (self._pos > self._current_datablock_pos and
601 self._pos > self._current_datablock_pos + self.current_datablock_size()):
603 def read(self, size):
604 """Read no more than size bytes -- but at least one byte,
605 unless _pos is already at the end of the stream.
610 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
612 if self._current_datablock_index >= len(self.data_locators):
614 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
615 self._pos += len(data)
618 class CollectionReader:
619 def __init__(self, manifest_locator_or_text):
620 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
621 self._manifest_text = manifest_locator_or_text
622 self._manifest_locator = None
624 self._manifest_locator = manifest_locator_or_text
625 self._manifest_text = None
632 if self._streams != None:
634 if not self._manifest_text:
635 self._manifest_text = Keep.get(self._manifest_locator)
637 for stream_line in self._manifest_text.split("\n"):
638 if stream_line != '':
639 stream_tokens = stream_line.split()
640 self._streams += [stream_tokens]
641 def all_streams(self):
644 for s in self._streams:
645 resp += [StreamReader(s)]
648 for s in self.all_streams():
649 for f in s.all_files():
651 def manifest_text(self):
653 return self._manifest_text
655 class CollectionWriter:
656 KEEP_BLOCK_SIZE = 2**26
658 self._data_buffer = []
659 self._data_buffer_len = 0
660 self._current_stream_files = []
661 self._current_stream_length = 0
662 self._current_stream_locators = []
663 self._current_stream_name = '.'
664 self._current_file_name = None
665 self._current_file_pos = 0
666 self._finished_streams = []
671 def write_directory_tree(self,
672 path, stream_name='.', max_manifest_depth=-1):
673 self.start_new_stream(stream_name)
675 if max_manifest_depth == 0:
676 dirents = util.listdir_recursive(path)
678 dirents = sorted(os.listdir(path))
679 for dirent in dirents:
680 target = os.path.join(path, dirent)
681 if os.path.isdir(target):
683 os.path.join(stream_name, dirent),
684 max_manifest_depth-1]]
686 self.start_new_file(dirent)
687 with open(target, 'rb') as f:
693 self.finish_current_stream()
694 map(lambda x: self.write_directory_tree(*x), todo)
696 def write(self, newdata):
697 self._data_buffer += [newdata]
698 self._data_buffer_len += len(newdata)
699 self._current_stream_length += len(newdata)
700 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
702 def flush_data(self):
703 data_buffer = ''.join(self._data_buffer)
704 if data_buffer != '':
705 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
706 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
707 self._data_buffer_len = len(self._data_buffer[0])
708 def start_new_file(self, newfilename=None):
709 self.finish_current_file()
710 self.set_current_file_name(newfilename)
711 def set_current_file_name(self, newfilename):
712 newfilename = re.sub(r' ', '\\\\040', newfilename)
713 if re.search(r'[ \t\n]', newfilename):
714 raise AssertionError("Manifest filenames cannot contain whitespace")
715 self._current_file_name = newfilename
716 def current_file_name(self):
717 return self._current_file_name
718 def finish_current_file(self):
719 if self._current_file_name == None:
720 if self._current_file_pos == self._current_stream_length:
722 raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
723 self._current_stream_files += [[self._current_file_pos,
724 self._current_stream_length - self._current_file_pos,
725 self._current_file_name]]
726 self._current_file_pos = self._current_stream_length
727 def start_new_stream(self, newstreamname='.'):
728 self.finish_current_stream()
729 self.set_current_stream_name(newstreamname)
730 def set_current_stream_name(self, newstreamname):
731 if re.search(r'[ \t\n]', newstreamname):
732 raise AssertionError("Manifest stream names cannot contain whitespace")
733 self._current_stream_name = newstreamname
734 def current_stream_name(self):
735 return self._current_stream_name
736 def finish_current_stream(self):
737 self.finish_current_file()
739 if len(self._current_stream_files) == 0:
741 elif self._current_stream_name == None:
742 raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
744 self._finished_streams += [[self._current_stream_name,
745 self._current_stream_locators,
746 self._current_stream_files]]
747 self._current_stream_files = []
748 self._current_stream_length = 0
749 self._current_stream_locators = []
750 self._current_stream_name = None
751 self._current_file_pos = 0
752 self._current_file_name = None
754 return Keep.put(self.manifest_text())
755 def manifest_text(self):
756 self.finish_current_stream()
758 for stream in self._finished_streams:
759 if not re.search(r'^\.(/.*)?$', stream[0]):
761 manifest += stream[0]
762 if len(stream[1]) == 0:
763 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
765 for locator in stream[1]:
766 manifest += " %s" % locator
767 for sfile in stream[2]:
768 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
772 global_client_object = None
776 def global_client_object():
777 global global_client_object
778 if global_client_object == None:
779 global_client_object = KeepClient()
780 return global_client_object
784 return Keep.global_client_object().get(locator)
788 return Keep.global_client_object().put(data)
792 self.service_roots = None
794 def shuffled_service_roots(self, hash):
795 if self.service_roots == None:
796 keep_disks = api().keep_disks().list().execute()['items']
797 roots = (("http%s://%s:%d/" %
798 ('s' if f['service_ssl_flag'] else '',
802 self.service_roots = sorted(set(roots))
803 logging.debug(str(self.service_roots))
805 pool = self.service_roots[:]
809 if len(pseq) < len(hash) / 4: # first time around
810 seed = hash[-4:] + hash
813 probe = int(seed[0:8], 16) % len(pool)
814 pseq += [pool[probe]]
815 pool = pool[:probe] + pool[probe+1:]
817 logging.debug(str(pseq))
820 def get(self, locator):
821 if 'KEEP_LOCAL_STORE' in os.environ:
822 return KeepClient.local_store_get(locator)
823 expect_hash = re.sub(r'\+.*', '', locator)
824 for service_root in self.shuffled_service_roots(expect_hash):
826 url = service_root + expect_hash
827 api_token = os.environ['ARVADOS_API_TOKEN']
828 headers = {'Authorization': "OAuth2 %s" % api_token,
829 'Accept': 'application/octet-stream'}
831 resp, content = h.request(url, 'GET', headers=headers)
832 if re.match(r'^2\d\d$', resp['status']):
833 m = hashlib.new('md5')
836 if md5 == expect_hash:
838 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
839 except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
840 logging.info("Request fail: GET %s => %s: %s" %
841 (url, type(e), str(e)))
842 raise Exception("Not found: %s" % expect_hash)
844 def put(self, data, **kwargs):
845 if 'KEEP_LOCAL_STORE' in os.environ:
846 return KeepClient.local_store_put(data)
847 m = hashlib.new('md5')
849 data_hash = m.hexdigest()
851 want_copies = kwargs.get('copies', 2)
852 for service_root in self.shuffled_service_roots(data_hash):
854 url = service_root + data_hash
855 api_token = os.environ['ARVADOS_API_TOKEN']
856 headers = {'Authorization': "OAuth2 %s" % api_token}
858 resp, content = h.request(url, 'PUT',
861 if (resp['status'] == '401' and
862 re.match(r'Timestamp verification failed', content)):
863 body = self.sign_for_old_server(data_hash, data)
865 resp, content = h.request(url, 'PUT',
868 if re.match(r'^2\d\d$', resp['status']):
870 if have_copies == want_copies:
871 return data_hash + '+' + str(len(data))
873 logging.warning("Request fail: PUT %s => %s %s" %
874 (url, resp['status'], content))
875 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
876 logging.warning("Request fail: PUT %s => %s: %s" %
877 (url, type(e), str(e)))
878 raise Exception("Write fail for %s: wanted %d but wrote %d" %
879 (data_hash, want_copies, have_copies))
881 def sign_for_old_server(self, data_hash, data):
882 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
886 def local_store_put(data):
887 m = hashlib.new('md5')
890 locator = '%s+%d' % (md5, len(data))
891 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
893 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
894 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
897 def local_store_get(locator):
898 r = re.search('^([0-9a-f]{32,})', locator)
900 raise Exception("Keep.get: invalid data locator '%s'" % locator)
901 if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
903 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: