2752: Add ResumeCache to arv-put.
[arvados.git] / sdk / python / arvados / commands / put.py
index 932483557621b3b76ee8b27547c86dce4516c269..a0dec2b05acd7aed316e3a6c1b9fe93a74f32ae8 100644 (file)
@@ -5,9 +5,13 @@
 
 import argparse
 import arvados
+import errno
+import fcntl
 import hashlib
+import json
 import os
 import sys
+import tempfile
 
 def parse_arguments(arguments):
     parser = argparse.ArgumentParser(
@@ -127,9 +131,22 @@ def parse_arguments(arguments):
 
     return args
 
+class ResumeCacheConflict(Exception):
+    pass
+
+
 class ResumeCache(object):
     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
 
+    def __init__(self, file_spec):
+        try:
+            self.cache_file = open(file_spec, 'a+')
+        except TypeError:
+            file_spec = self.make_path(file_spec)
+            self.cache_file = open(file_spec, 'a+')
+        self._lock_file(self.cache_file)
+        self.filename = self.cache_file.name
+
     @classmethod
     def make_path(cls, args):
         md5 = hashlib.md5()
@@ -142,6 +159,44 @@ class ResumeCache(object):
             md5.update(args.filename)
         return os.path.join(cls.CACHE_DIR, md5.hexdigest())
 
+    def _lock_file(self, fileobj):
+        try:
+            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+    def load(self):
+        self.cache_file.seek(0)
+        return json.load(self.cache_file)
+
+    def save(self, data):
+        try:
+            new_cache_fd, new_cache_name = tempfile.mkstemp(
+                dir=os.path.dirname(self.filename))
+            self._lock_file(new_cache_fd)
+            new_cache = os.fdopen(new_cache_fd, 'r+')
+            json.dump(data, new_cache)
+            os.rename(new_cache_name, self.filename)
+        except (IOError, OSError, ResumeCacheConflict) as error:
+            try:
+                os.unlink(new_cache_name)
+            except NameError:  # mkstemp failed.
+                pass
+        else:
+            self.cache_file.close()
+            self.cache_file = new_cache
+
+    def close(self):
+        self.cache_file.close()
+
+    def destroy(self):
+        try:
+            os.unlink(self.filename)
+        except OSError as error:
+            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                raise
+        self.close()
+
 
 class CollectionWriterWithProgress(arvados.CollectionWriter):
     def flush_data(self, *args, **kwargs):