2752: Improve arv-put initialization from cache.
[arvados.git] / sdk / python / arvados / commands / put.py
index 439504786ec033de6a3b07a32ea82206e7e0237a..39263698b30a136522219d09a6e7eedd5e28e41e 100644 (file)
@@ -108,6 +108,16 @@ def parse_arguments(arguments):
     total data size).
     """)
 
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument('--resume', action='store_true', default=True,
+                       help="""
+    Continue interrupted uploads from cached state (default).
+    """)
+    group.add_argument('--no-resume', action='store_false', dest='resume',
+                       help="""
+    Do not continue interrupted uploads from cached state.
+    """)
+
     args = parser.parse_args(arguments)
 
     if len(args.paths) == 0:
@@ -139,6 +149,16 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
 
+    @classmethod
+    def setup_user_cache(cls):
+        try:
+            os.makedirs(cls.CACHE_DIR)
+        except OSError as error:
+            if error.errno != errno.EEXIST:
+                raise
+        else:
+            os.chmod(cls.CACHE_DIR, 0o700)
+
     def __init__(self, file_spec):
         try:
             self.cache_file = open(file_spec, 'a+')
@@ -204,27 +224,26 @@ class ResumeCache(object):
 
 
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    def __init__(self, cache=None, reporter=None, bytes_expected=None):
-        self.__init_locals__(cache, reporter, bytes_expected)
-        super(ArvPutCollectionWriter, self).__init__()
+    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+                   ['bytes_written'])
 
-    def __init_locals__(self, cache, reporter, bytes_expected):
+    def __init__(self, cache=None, reporter=None, bytes_expected=None):
+        self.bytes_written = 0
         self.cache = cache
         self.report_func = reporter
-        self.bytes_written = 0
         self.bytes_expected = bytes_expected
+        super(ArvPutCollectionWriter, self).__init__()
 
     @classmethod
     def from_cache(cls, cache, reporter=None, bytes_expected=None):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state)
+            writer = cls.from_state(state, cache, reporter, bytes_expected)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
             return cls(cache, reporter, bytes_expected)
         else:
-            writer.__init_locals__(cache, reporter, bytes_expected)
             return writer
 
     def checkpoint_state(self):
@@ -281,6 +300,7 @@ def progress_writer(progress_func, outfile=sys.stderr):
     return write_progress
 
 def main(arguments=None):
+    ResumeCache.setup_user_cache()
     args = parse_arguments(arguments)
 
     if args.progress:
@@ -290,8 +310,16 @@ def main(arguments=None):
     else:
         reporter = None
 
-    writer = ArvPutCollectionWriter(
-        reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
+    try:
+        resume_cache = ResumeCache(args)
+        if not args.resume:
+            resume_cache.restart()
+    except ResumeCacheConflict:
+        print "arv-put: Another process is already uploading this data."
+        sys.exit(1)
+
+    writer = ArvPutCollectionWriter.from_cache(
+        resume_cache, reporter, expected_bytes_for(args.paths))
 
     # Copy file data to Keep.
     for path in args.paths:
@@ -318,6 +346,7 @@ def main(arguments=None):
 
         # Print the locator (uuid) of the new collection.
         print writer.finish()
+    resume_cache.destroy()
 
 if __name__ == '__main__':
     main()