a7ed7552b53ba686a2769289503a87008d557a68
[arvados.git] / sdk / python / arvados.py
1 import gflags
2 import httplib2
3 import logging
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17
18 from apiclient import errors
19 from apiclient.discovery import build
20
21 class CredentialsFromEnv:
22     @staticmethod
23     def http_request(self, uri, **kwargs):
24         from httplib import BadStatusLine
25         if 'headers' not in kwargs:
26             kwargs['headers'] = {}
27         kwargs['headers']['Authorization'] = 'OAuth2 %s' % os.environ['ARVADOS_API_TOKEN']
28         try:
29             return self.orig_http_request(uri, **kwargs)
30         except BadStatusLine:
31             # This is how httplib tells us that it tried to reuse an
32             # existing connection but it was already closed by the
33             # server. In that case, yes, we would like to retry.
34             # Unfortunately, we are not absolutely certain that the
35             # previous call did not succeed, so this is slightly
36             # risky.
37             return self.orig_http_request(uri, **kwargs)
38     def authorize(self, http):
39         http.orig_http_request = http.request
40         http.request = types.MethodType(self.http_request, http)
41         return http
42
43 url = ('https://%s/discovery/v1/apis/'
44        '{api}/{apiVersion}/rest' % os.environ['ARVADOS_API_HOST'])
45 credentials = CredentialsFromEnv()
46 http = httplib2.Http()
47 http = credentials.authorize(http)
48 http.disable_ssl_certificate_validation=True
49 service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
50
51 def task_set_output(self,s):
52     service.job_tasks().update(uuid=self['uuid'],
53                                job_task=json.dumps({
54                 'output':s,
55                 'success':True,
56                 'progress':1.0
57                 })).execute()
58
59 _current_task = None
60 def current_task():
61     global _current_task
62     if _current_task:
63         return _current_task
64     t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65     t = UserDict.UserDict(t)
66     t.set_output = types.MethodType(task_set_output, t)
67     t.tmpdir = os.environ['TASK_WORK']
68     _current_task = t
69     return t
70
71 _current_job = None
72 def current_job():
73     global _current_job
74     if _current_job:
75         return _current_job
76     t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
77     t = UserDict.UserDict(t)
78     t.tmpdir = os.environ['JOB_WORK']
79     _current_job = t
80     return t
81
82 def api():
83     return service
84
85 class JobTask:
86     def __init__(self, parameters=dict(), resource_limits=dict()):
87         print "init jobtask %s %s" % (parameters, resource_limits)
88
89 class job_setup:
90     @staticmethod
91     def one_task_per_input_file(if_sequence=0, and_end_task=True):
92         if if_sequence != current_task()['sequence']:
93             return
94         job_input = current_job()['script_parameters']['input']
95         cr = CollectionReader(job_input)
96         for s in cr.all_streams():
97             for f in s.all_files():
98                 task_input = f.as_manifest()
99                 new_task_attrs = {
100                     'job_uuid': current_job()['uuid'],
101                     'created_by_job_task_uuid': current_task()['uuid'],
102                     'sequence': if_sequence + 1,
103                     'parameters': {
104                         'input':task_input
105                         }
106                     }
107                 service.job_tasks().create(job_task=json.dumps(new_task_attrs)).execute()
108         if and_end_task:
109             service.job_tasks().update(uuid=current_task()['uuid'],
110                                        job_task=json.dumps({'success':True})
111                                        ).execute()
112             exit(0)
113
114 class util:
115     @staticmethod
116     def run_command(execargs, **kwargs):
117         if 'stdin' not in kwargs:
118             kwargs['stdin'] = subprocess.PIPE
119         if 'stdout' not in kwargs:
120             kwargs['stdout'] = subprocess.PIPE
121         if 'stderr' not in kwargs:
122             kwargs['stderr'] = subprocess.PIPE
123         p = subprocess.Popen(execargs, close_fds=True, shell=False,
124                              **kwargs)
125         stdoutdata, stderrdata = p.communicate(None)
126         if p.returncode != 0:
127             raise Exception("run_command %s exit %d:\n%s" %
128                             (execargs, p.returncode, stderrdata))
129         return stdoutdata, stderrdata
130
131     @staticmethod
132     def git_checkout(url, version, path):
133         if not re.search('^/', path):
134             path = os.path.join(current_job().tmpdir, path)
135         if not os.path.exists(path):
136             util.run_command(["git", "clone", url, path],
137                              cwd=os.path.dirname(path))
138         util.run_command(["git", "checkout", version],
139                          cwd=path)
140         return path
141
142     @staticmethod
143     def tar_extractor(path, decompress_flag):
144         return subprocess.Popen(["tar",
145                                  "-C", path,
146                                  ("-x%sf" % decompress_flag),
147                                  "-"],
148                                 stdout=None,
149                                 stdin=subprocess.PIPE, stderr=sys.stderr,
150                                 shell=False, close_fds=True)
151
152     @staticmethod
153     def tarball_extract(tarball, path):
154         """Retrieve a tarball from Keep and extract it to a local
155         directory.  Return the absolute path where the tarball was
156         extracted. If the top level of the tarball contained just one
157         file or directory, return the absolute path of that single
158         item.
159
160         tarball -- collection locator
161         path -- where to extract the tarball: absolute, or relative to job tmp
162         """
163         if not re.search('^/', path):
164             path = os.path.join(current_job().tmpdir, path)
165         lockfile = open(path + '.lock', 'w')
166         fcntl.flock(lockfile, fcntl.LOCK_EX)
167         try:
168             os.stat(path)
169         except OSError:
170             os.mkdir(path)
171         already_have_it = False
172         try:
173             if os.readlink(os.path.join(path, '.locator')) == tarball:
174                 already_have_it = True
175         except OSError:
176             pass
177         if not already_have_it:
178
179             # emulate "rm -f" (i.e., if the file does not exist, we win)
180             try:
181                 os.unlink(os.path.join(path, '.locator'))
182             except OSError:
183                 if os.path.exists(os.path.join(path, '.locator')):
184                     os.unlink(os.path.join(path, '.locator'))
185
186             for f in CollectionReader(tarball).all_files():
187                 if re.search('\.(tbz|tar.bz2)$', f.name()):
188                     p = tar_extractor(path, 'j')
189                 elif re.search('\.(tgz|tar.gz)$', f.name()):
190                     p = tar_extractor(path, 'z')
191                 elif re.search('\.tar$', f.name()):
192                     p = tar_extractor(path, '')
193                 else:
194                     raise Exception("tarball_extract cannot handle filename %s"
195                                     % f.name())
196                 while True:
197                     buf = f.read(2**20)
198                     if len(buf) == 0:
199                         break
200                     p.stdin.write(buf)
201                 p.stdin.close()
202                 p.wait()
203                 if p.returncode != 0:
204                     lockfile.close()
205                     raise Exception("tar exited %d" % p.returncode)
206             os.symlink(tarball, os.path.join(path, '.locator'))
207         tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
208         lockfile.close()
209         if len(tld_extracts) == 1:
210             return os.path.join(path, tld_extracts[0])
211         return path
212
213     @staticmethod
214     def zipball_extract(zipball, path):
215         """Retrieve a zip archive from Keep and extract it to a local
216         directory.  Return the absolute path where the archive was
217         extracted. If the top level of the archive contained just one
218         file or directory, return the absolute path of that single
219         item.
220
221         zipball -- collection locator
222         path -- where to extract the archive: absolute, or relative to job tmp
223         """
224         if not re.search('^/', path):
225             path = os.path.join(current_job().tmpdir, path)
226         lockfile = open(path + '.lock', 'w')
227         fcntl.flock(lockfile, fcntl.LOCK_EX)
228         try:
229             os.stat(path)
230         except OSError:
231             os.mkdir(path)
232         already_have_it = False
233         try:
234             if os.readlink(os.path.join(path, '.locator')) == zipball:
235                 already_have_it = True
236         except OSError:
237             pass
238         if not already_have_it:
239
240             # emulate "rm -f" (i.e., if the file does not exist, we win)
241             try:
242                 os.unlink(os.path.join(path, '.locator'))
243             except OSError:
244                 if os.path.exists(os.path.join(path, '.locator')):
245                     os.unlink(os.path.join(path, '.locator'))
246
247             for f in CollectionReader(zipball).all_files():
248                 if not re.search('\.zip$', f.name()):
249                     raise Exception("zipball_extract cannot handle filename %s"
250                                     % f.name())
251                 zip_filename = os.path.join(path, os.path.basename(f.name()))
252                 zip_file = open(zip_filename, 'wb')
253                 while True:
254                     buf = f.read(2**20)
255                     if len(buf) == 0:
256                         break
257                     zip_file.write(buf)
258                 zip_file.close()
259                 
260                 p = subprocess.Popen(["unzip",
261                                       "-q", "-o",
262                                       "-d", path,
263                                       zip_filename],
264                                      stdout=None,
265                                      stdin=None, stderr=sys.stderr,
266                                      shell=False, close_fds=True)
267                 p.wait()
268                 if p.returncode != 0:
269                     lockfile.close()
270                     raise Exception("unzip exited %d" % p.returncode)
271                 os.unlink(zip_filename)
272             os.symlink(zipball, os.path.join(path, '.locator'))
273         tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
274         lockfile.close()
275         if len(tld_extracts) == 1:
276             return os.path.join(path, tld_extracts[0])
277         return path
278
279     @staticmethod
280     def collection_extract(collection, path, files=[], decompress=True):
281         """Retrieve a collection from Keep and extract it to a local
282         directory.  Return the absolute path where the collection was
283         extracted.
284
285         collection -- collection locator
286         path -- where to extract: absolute, or relative to job tmp
287         """
288         if not re.search('^/', path):
289             path = os.path.join(current_job().tmpdir, path)
290         lockfile = open(path + '.lock', 'w')
291         fcntl.flock(lockfile, fcntl.LOCK_EX)
292         try:
293             os.stat(path)
294         except OSError:
295             os.mkdir(path)
296         already_have_it = False
297         try:
298             if os.readlink(os.path.join(path, '.locator')) == collection:
299                 already_have_it = True
300         except OSError:
301             pass
302
303         # emulate "rm -f" (i.e., if the file does not exist, we win)
304         files_got = []
305         try:
306             os.unlink(os.path.join(path, '.locator'))
307         except OSError:
308             if os.path.exists(os.path.join(path, '.locator')):
309                 os.unlink(os.path.join(path, '.locator'))
310
311         for f in CollectionReader(collection).all_files():
312             if (files == [] or
313                 ((f.name() not in files_got) and
314                  (f.name() in files or
315                   (decompress and f.decompressed_name() in files)))):
316                 outname = f.decompressed_name() if decompress else f.name()
317                 files_got += [outname]
318                 if os.path.exists(os.path.join(path, outname)):
319                     continue
320                 outfile = open(os.path.join(path, outname), 'w')
321                 for buf in (f.readall_decompressed() if decompress
322                             else f.readall()):
323                     outfile.write(buf)
324                 outfile.close()
325         if len(files_got) < len(files):
326             raise Exception("Wanted files %s but only got %s from %s" % (files, files_got, map(lambda z: z.name(), list(CollectionReader(collection).all_files()))))
327         os.symlink(collection, os.path.join(path, '.locator'))
328
329         lockfile.close()
330         return path
331
332 class DataReader:
333     def __init__(self, data_locator):
334         self.data_locator = data_locator
335         self.p = subprocess.Popen(["whget", "-r", self.data_locator, "-"],
336                                   stdout=subprocess.PIPE,
337                                   stdin=None, stderr=subprocess.PIPE,
338                                   shell=False, close_fds=True)
339     def __enter__(self):
340         pass
341     def __exit__(self):
342         self.close()
343     def read(self, size, **kwargs):
344         return self.p.stdout.read(size, **kwargs)
345     def close(self):
346         self.p.stdout.close()
347         if not self.p.stderr.closed:
348             for err in self.p.stderr:
349                 print >> sys.stderr, err
350             self.p.stderr.close()
351         self.p.wait()
352         if self.p.returncode != 0:
353             raise Exception("whget subprocess exited %d" % self.p.returncode)
354
355 class StreamFileReader:
356     def __init__(self, stream, pos, size, name):
357         self._stream = stream
358         self._pos = pos
359         self._size = size
360         self._name = name
361         self._filepos = 0
362     def name(self):
363         return self._name
364     def decompressed_name(self):
365         return re.sub('\.(bz2|gz)$', '', self._name)
366     def size(self):
367         return self._size
368     def stream_name(self):
369         return self._stream.name()
370     def read(self, size, **kwargs):
371         self._stream.seek(self._pos + self._filepos)
372         data = self._stream.read(min(size, self._size - self._filepos))
373         self._filepos += len(data)
374         return data
375     def readall(self, size=2**20, **kwargs):
376         while True:
377             data = self.read(size, **kwargs)
378             if data == '':
379                 break
380             yield data
381     def bunzip2(self, size):
382         decompressor = bz2.BZ2Decompressor()
383         for chunk in self.readall(size):
384             data = decompressor.decompress(chunk)
385             if data and data != '':
386                 yield data
387     def gunzip(self, size):
388         decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
389         for chunk in self.readall(size):
390             data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
391             if data and data != '':
392                 yield data
393     def readall_decompressed(self, size=2**20):
394         self._stream.seek(self._pos + self._filepos)
395         if re.search('\.bz2$', self._name):
396             return self.bunzip2(size)
397         elif re.search('\.gz$', self._name):
398             return self.gunzip(size)
399         else:
400             return self.readall(size)
401     def readlines(self, decompress=True):
402         if decompress:
403             datasource = self.readall_decompressed()
404         else:
405             self._stream.seek(self._pos + self._filepos)
406             datasource = self.readall()
407         data = ''
408         for newdata in datasource:
409             data += newdata
410             sol = 0
411             while True:
412                 eol = string.find(data, "\n", sol)
413                 if eol < 0:
414                     break
415                 yield data[sol:eol+1]
416                 sol = eol+1
417             data = data[sol:]
418         if data != '':
419             yield data
420     def as_manifest(self):
421         if self.size() == 0:
422             return ("%s d41d8cd98f00b204e9800998ecf8427e+0 0:0:%s\n"
423                     % (self._stream.name(), self.name()))
424         return string.join(self._stream.tokens_for_range(self._pos, self._size),
425                            " ") + "\n"
426
427 class StreamReader:
428     def __init__(self, tokens):
429         self._tokens = tokens
430         self._current_datablock_data = None
431         self._current_datablock_pos = 0
432         self._current_datablock_index = -1
433         self._pos = 0
434
435         self._stream_name = None
436         self.data_locators = []
437         self.files = []
438
439         for tok in self._tokens:
440             if self._stream_name == None:
441                 self._stream_name = tok
442             elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
443                 self.data_locators += [tok]
444             elif re.search(r'^\d+:\d+:\S+', tok):
445                 pos, size, name = tok.split(':',2)
446                 self.files += [[int(pos), int(size), name]]
447             else:
448                 raise Exception("Invalid manifest format")
449     def tokens_for_range(self, range_start, range_size):
450         resp = [self._stream_name]
451         return_all_tokens = False
452         block_start = 0
453         token_bytes_skipped = 0
454         for locator in self.data_locators:
455             sizehint = re.search(r'\+(\d+)', locator)
456             if not sizehint:
457                 return_all_tokens = True
458             if return_all_tokens:
459                 resp += [locator]
460                 next
461             blocksize = int(sizehint.group(0))
462             if range_start + range_size <= block_start:
463                 break
464             if range_start < block_start + blocksize:
465                 resp += [locator]
466             else:
467                 token_bytes_skipped += blocksize
468             block_start += blocksize
469         for f in self.files:
470             if ((f[0] < range_start + range_size)
471                 and
472                 (f[0] + f[1] > range_start)
473                 and
474                 f[1] > 0):
475                 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
476         return resp
477     def name(self):
478         return self._stream_name
479     def all_files(self):
480         for f in self.files:
481             pos, size, name = f
482             yield StreamFileReader(self, pos, size, name)
483     def nextdatablock(self):
484         if self._current_datablock_index < 0:
485             self._current_datablock_pos = 0
486             self._current_datablock_index = 0
487         else:
488             self._current_datablock_pos += self.current_datablock_size()
489             self._current_datablock_index += 1
490         self._current_datablock_data = None
491     def current_datablock_data(self):
492         if self._current_datablock_data == None:
493             self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
494         return self._current_datablock_data
495     def current_datablock_size(self):
496         if self._current_datablock_index < 0:
497             self.nextdatablock()
498         sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
499         if sizehint:
500             return int(sizehint.group(0))
501         return len(self.current_datablock_data())
502     def seek(self, pos):
503         """Set the position of the next read operation."""
504         self._pos = pos
505     def really_seek(self):
506         """Find and load the appropriate data block, so the byte at
507         _pos is in memory.
508         """
509         if self._pos == self._current_datablock_pos:
510             return True
511         if (self._current_datablock_pos != None and
512             self._pos >= self._current_datablock_pos and
513             self._pos <= self._current_datablock_pos + self.current_datablock_size()):
514             return True
515         if self._pos < self._current_datablock_pos:
516             self._current_datablock_index = -1
517             self.nextdatablock()
518         while (self._pos > self._current_datablock_pos and
519                self._pos > self._current_datablock_pos + self.current_datablock_size()):
520             self.nextdatablock()
521     def read(self, size):
522         """Read no more than size bytes -- but at least one byte,
523         unless _pos is already at the end of the stream.
524         """
525         if size == 0:
526             return ''
527         self.really_seek()
528         while self._pos >= self._current_datablock_pos + self.current_datablock_size():
529             self.nextdatablock()
530             if self._current_datablock_index >= len(self.data_locators):
531                 return None
532         data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
533         self._pos += len(data)
534         return data
535
536 class CollectionReader:
537     def __init__(self, manifest_locator_or_text):
538         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
539             self._manifest_text = manifest_locator_or_text
540             self._manifest_locator = None
541         else:
542             self._manifest_locator = manifest_locator_or_text
543             self._manifest_text = None
544         self._streams = None
545     def __enter__(self):
546         pass
547     def __exit__(self):
548         pass
549     def _populate(self):
550         if self._streams != None:
551             return
552         if not self._manifest_text:
553             self._manifest_text = Keep.get(self._manifest_locator)
554         self._streams = []
555         for stream_line in self._manifest_text.split("\n"):
556             stream_tokens = stream_line.split()
557             self._streams += [stream_tokens]
558     def all_streams(self):
559         self._populate()
560         resp = []
561         for s in self._streams:
562             resp += [StreamReader(s)]
563         return resp
564     def all_files(self):
565         for s in self.all_streams():
566             for f in s.all_files():
567                 yield f
568     def manifest_text(self):
569         self._populate()
570         return self._manifest_text
571
572 class CollectionWriter:
573     KEEP_BLOCK_SIZE = 2**26
574     def __init__(self):
575         self._data_buffer = []
576         self._data_buffer_len = 0
577         self._current_stream_files = []
578         self._current_stream_length = 0
579         self._current_stream_locators = []
580         self._current_stream_name = '.'
581         self._current_file_name = None
582         self._current_file_pos = 0
583         self._finished_streams = []
584     def __enter__(self):
585         pass
586     def __exit__(self):
587         self.finish()
588     def write(self, newdata):
589         self._data_buffer += [newdata]
590         self._data_buffer_len += len(newdata)
591         self._current_stream_length += len(newdata)
592         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
593             self.flush_data()
594     def flush_data(self):
595         data_buffer = ''.join(self._data_buffer)
596         if data_buffer != '':
597             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
598             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
599     def start_new_file(self, newfilename=None):
600         self.finish_current_file()
601         self.set_current_file_name(newfilename)
602     def set_current_file_name(self, newfilename):
603         if re.search(r'[ \t\n]', newfilename):
604             raise AssertionError("Manifest filenames cannot contain whitespace")
605         self._current_file_name = newfilename
606     def current_file_name(self):
607         return self._current_file_name
608     def finish_current_file(self):
609         if self._current_file_name == None:
610             if self._current_file_pos == self._current_stream_length:
611                 return
612             raise Exception("Cannot finish an unnamed file (%d bytes at offset %d in '%s' stream)" % (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name))
613         self._current_stream_files += [[self._current_file_pos,
614                                        self._current_stream_length - self._current_file_pos,
615                                        self._current_file_name]]
616         self._current_file_pos = self._current_stream_length
617     def start_new_stream(self, newstreamname=None):
618         self.finish_current_stream()
619         self.set_current_stream_name(newstreamname)
620     def set_current_stream_name(self, newstreamname):
621         if re.search(r'[ \t\n]', newstreamname):
622             raise AssertionError("Manifest stream names cannot contain whitespace")
623         self._current_stream_name = newstreamname
624     def current_stream_name(self):
625         return self._current_stream_name
626     def finish_current_stream(self):
627         self.finish_current_file()
628         self.flush_data()
629         if len(self._current_stream_files) == 0:
630             pass
631         elif self._current_stream_name == None:
632             raise Exception("Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files)))
633         else:
634             self._finished_streams += [[self._current_stream_name,
635                                        self._current_stream_locators,
636                                        self._current_stream_files]]
637         self._current_stream_files = []
638         self._current_stream_length = 0
639         self._current_stream_locators = []
640         self._current_stream_name = None
641         self._current_file_pos = 0
642         self._current_file_name = None
643     def finish(self):
644         return Keep.put(self.manifest_text())
645     def manifest_text(self):
646         self.finish_current_stream()
647         manifest = ''
648         for stream in self._finished_streams:
649             manifest += stream[0]
650             if len(stream[1]) == 0:
651                 manifest += " d41d8cd98f00b204e9800998ecf8427e+0"
652             else:
653                 for locator in stream[1]:
654                     manifest += " %s" % locator
655             for sfile in stream[2]:
656                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2])
657             manifest += "\n"
658         return manifest
659
660 class Keep:
661     @staticmethod
662     def put(data):
663         if 'KEEP_LOCAL_STORE' in os.environ:
664             return Keep.local_store_put(data)
665         p = subprocess.Popen(["whput", "-"],
666                              stdout=subprocess.PIPE,
667                              stdin=subprocess.PIPE,
668                              stderr=subprocess.PIPE,
669                              shell=False, close_fds=True)
670         stdoutdata, stderrdata = p.communicate(data)
671         if p.returncode != 0:
672             raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
673         return stdoutdata.rstrip()
674     @staticmethod
675     def get(locator):
676         if 'KEEP_LOCAL_STORE' in os.environ:
677             return Keep.local_store_get(locator)
678         p = subprocess.Popen(["whget", locator, "-"],
679                              stdout=subprocess.PIPE,
680                              stdin=None,
681                              stderr=subprocess.PIPE,
682                              shell=False, close_fds=True)
683         stdoutdata, stderrdata = p.communicate(None)
684         if p.returncode != 0:
685             raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata))
686         m = hashlib.new('md5')
687         m.update(stdoutdata)
688         try:
689             if locator.index(m.hexdigest()) == 0:
690                 return stdoutdata
691         except ValueError:
692             pass
693         raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest()))
694     @staticmethod
695     def local_store_put(data):
696         m = hashlib.new('md5')
697         m.update(data)
698         md5 = m.hexdigest()
699         locator = '%s+%d' % (md5, len(data))
700         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
701             f.write(data)
702         os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
703                   os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
704         return locator
705     @staticmethod
706     def local_store_get(locator):
707         r = re.search('^([0-9a-f]{32,})', locator)
708         if not r:
709             raise Exception("Keep.get: invalid data locator '%s'" % locator)
710         if r.group(0) == 'd41d8cd98f00b204e9800998ecf8427e':
711             return ''
712         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
713             return f.read()