2752: Don't duplicate arv-put work after resume.
[arvados.git] / sdk / python / arvados / commands / put.py
index a0dec2b05acd7aed316e3a6c1b9fe93a74f32ae8..44f911e60b5ed354124b4f532feba157267859f7 100644 (file)
@@ -5,6 +5,7 @@
 
 import argparse
 import arvados
+import base64
 import errno
 import fcntl
 import hashlib
@@ -107,6 +108,16 @@ def parse_arguments(arguments):
     total data size).
     """)
 
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument('--resume', action='store_true', default=True,
+                       help="""
+    Continue interrupted uploads from cached state (default).
+    """)
+    group.add_argument('--no-resume', action='store_false', dest='resume',
+                       help="""
+    Do not continue interrupted uploads from cached state.
+    """)
+
     args = parser.parse_args(arguments)
 
     if len(args.paths) == 0:
@@ -138,12 +149,18 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
 
-    def __init__(self, file_spec):
+    @classmethod
+    def setup_user_cache(cls):
         try:
-            self.cache_file = open(file_spec, 'a+')
-        except TypeError:
-            file_spec = self.make_path(file_spec)
-            self.cache_file = open(file_spec, 'a+')
+            os.makedirs(cls.CACHE_DIR)
+        except OSError as error:
+            if error.errno != errno.EEXIST:
+                raise
+        else:
+            os.chmod(cls.CACHE_DIR, 0o700)
+
+    def __init__(self, file_spec):
+        self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
 
@@ -197,65 +214,134 @@ class ResumeCache(object):
                 raise
         self.close()
 
+    def restart(self):
+        self.destroy()
+        self.__init__(self.filename)
 
-class CollectionWriterWithProgress(arvados.CollectionWriter):
-    def flush_data(self, *args, **kwargs):
-        if not getattr(self, 'display_type', None):
-            return
-        if not hasattr(self, 'bytes_flushed'):
-            self.bytes_flushed = 0
-        self.bytes_flushed += self._data_buffer_len
-        super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
-        self.bytes_flushed -= self._data_buffer_len
-        if self.display_type == 'machine':
-            sys.stderr.write('%s %d: %d written %d total\n' %
-                             (sys.argv[0],
-                              os.getpid(),
-                              self.bytes_flushed,
-                              getattr(self, 'bytes_expected', -1)))
-        elif getattr(self, 'bytes_expected', 0) > 0:
-            pct = 100.0 * self.bytes_flushed / self.bytes_expected
-            sys.stderr.write('\r%dM / %dM %.1f%% ' %
-                             (self.bytes_flushed >> 20,
-                              self.bytes_expected >> 20, pct))
-        else:
-            sys.stderr.write('\r%d ' % self.bytes_flushed)
 
-    def manifest_text(self, *args, **kwargs):
-        manifest_text = (super(CollectionWriterWithProgress, self)
-                         .manifest_text(*args, **kwargs))
-        if getattr(self, 'display_type', None):
-            if self.display_type == 'human':
-                sys.stderr.write('\n')
-            self.display_type = None
-        return manifest_text
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+                   ['bytes_written', '_seen_inputs'])
 
+    def __init__(self, cache=None, reporter=None, bytes_expected=None):
+        self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
+        if reporter is None:
+            self.report_progress = lambda bytes_w, bytes_e: None
+        else:
+            self.report_progress = reporter
+        self.bytes_expected = bytes_expected
+        super(ArvPutCollectionWriter, self).__init__()
 
-def main(arguments=None):
-    args = parse_arguments(arguments)
+    @classmethod
+    def from_cache(cls, cache, reporter=None, bytes_expected=None):
+        try:
+            state = cache.load()
+            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+            writer = cls.from_state(state, cache, reporter, bytes_expected)
+        except (TypeError, ValueError,
+                arvados.errors.StaleWriterStateError) as error:
+            return cls(cache, reporter, bytes_expected)
+        else:
+            return writer
 
-    if args.progress:
-        writer = CollectionWriterWithProgress()
-        writer.display_type = 'human'
-    elif args.batch_progress:
-        writer = CollectionWriterWithProgress()
-        writer.display_type = 'machine'
-    else:
-        writer = arvados.CollectionWriter()
+    def preresume_hook(self):
+        print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
+        self.report_progress(self.bytes_written, self.bytes_expected)
 
+    def checkpoint_state(self):
+        if self.cache is None:
+            return
+        state = self.dump_state()
+        # Transform attributes for serialization.
+        for attr, value in state.items():
+            if attr == '_data_buffer':
+                state[attr] = base64.encodestring(''.join(value))
+            elif hasattr(value, 'popleft'):
+                state[attr] = list(value)
+        self.cache.save(state)
+
+    def flush_data(self):
+        bytes_buffered = self._data_buffer_len
+        super(ArvPutCollectionWriter, self).flush_data()
+        self.bytes_written += (bytes_buffered - self._data_buffer_len)
+        self.report_progress(self.bytes_written, self.bytes_expected)
+
+    def _record_new_input(self, input_type, source_name, dest_name):
+        # The key needs to be a list because that's what we'll get back
+        # from JSON deserialization.
+        key = [input_type, source_name, dest_name]
+        if key in self._seen_inputs:
+            return False
+        self._seen_inputs.append(key)
+        return True
+
+    def write_file(self, source, filename=None):
+        if self._record_new_input('file', source, filename):
+            super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        if self._record_new_input('directory', path, stream_name):
+            super(ArvPutCollectionWriter, self).write_directory_tree(
+                path, stream_name, max_manifest_depth)
+
+
+def expected_bytes_for(pathlist):
     # Walk the given directory trees and stat files, adding up file sizes,
     # so we can display progress as percent
-    writer.bytes_expected = 0
-    for path in args.paths:
+    bytesum = 0
+    for path in pathlist:
         if os.path.isdir(path):
             for filename in arvados.util.listdir_recursive(path):
-                writer.bytes_expected += os.path.getsize(
-                    os.path.join(path, filename))
+                bytesum += os.path.getsize(os.path.join(path, filename))
         elif not os.path.isfile(path):
-            del writer.bytes_expected
-            break
+            return None
         else:
-            writer.bytes_expected += os.path.getsize(path)
+            bytesum += os.path.getsize(path)
+    return bytesum
+
+_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
+                                                            os.getpid())
+def machine_progress(bytes_written, bytes_expected):
+    return _machine_format.format(
+        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(bytes_written, bytes_expected):
+    if bytes_expected:
+        return "\r{}M / {}M {:.1%} ".format(
+            bytes_written >> 20, bytes_expected >> 20,
+            float(bytes_written) / bytes_expected)
+    else:
+        return "\r{} ".format(bytes_written)
+
+def progress_writer(progress_func, outfile=sys.stderr):
+    def write_progress(bytes_written, bytes_expected):
+        outfile.write(progress_func(bytes_written, bytes_expected))
+    return write_progress
+
+def main(arguments=None):
+    ResumeCache.setup_user_cache()
+    args = parse_arguments(arguments)
+
+    if args.progress:
+        reporter = progress_writer(human_progress)
+    elif args.batch_progress:
+        reporter = progress_writer(machine_progress)
+    else:
+        reporter = None
+
+    try:
+        resume_cache = ResumeCache(ResumeCache.make_path(args))
+        if not args.resume:
+            resume_cache.restart()
+    except ResumeCacheConflict:
+        print "arv-put: Another process is already uploading this data."
+        sys.exit(1)
+
+    writer = ArvPutCollectionWriter.from_cache(
+        resume_cache, reporter, expected_bytes_for(args.paths))
 
     # Copy file data to Keep.
     for path in args.paths:
@@ -282,6 +368,7 @@ def main(arguments=None):
 
         # Print the locator (uuid) of the new collection.
         print writer.finish()
+    resume_cache.destroy()
 
 if __name__ == '__main__':
     main()