10383: Several fixes/enhancements to arv-put:
authorLucas Di Pentima <lucas@curoverse.com>
Mon, 21 Nov 2016 22:23:20 +0000 (19:23 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Mon, 21 Nov 2016 22:23:20 +0000 (19:23 -0300)
* Added --no-cache to avoid creating/loading a cache file.
* Removed unnecessary code when writing empty ArvadosFiles.
* Use absolute path on cache index so that is not relative to the CWD and can be used from anywhere on the system.

sdk/python/arvados/commands/put.py

index b88fdc69c113ceb0e0fde13613f6a5c1b2400f53..c7de888bca88b03f6eeb0e8c504c94343a520069 100644 (file)
@@ -170,6 +170,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -194,12 +204,17 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
@@ -303,12 +318,13 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None, replication_desired=None,
                  filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
@@ -333,6 +349,9 @@ class ArvPutUploadJob(object):
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
 
+        if not self.use_cache and self.resume:
+            raise ArgumentError('resume cannot be True when use_cache is False')
+
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
@@ -348,29 +367,32 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    if path == '.' or path == './' or os.path.dirname(path) == '':
-                        dirname = ''
-                    else:
-                        dirname = os.path.dirname(path) + '/'
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
                             self._write_file(os.path.join(root, f),
-                                             os.path.join(root[len(dirname):], f))
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
+                    self._write_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
+            # Commit all pending blocks & one last _update()
+            self._local_collection.manifest_text()
+            self._update(final=True)
+            if self.use_cache:
+                self._cache_file.close()
             if save_collection:
                 self.save_collection()
-            self._update()
-            self._cache_file.close()
             # Correct the final written bytes count
             self.bytes_written -= self.bytes_skipped
 
@@ -425,17 +447,21 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
             self.bytes_written = self._collection_size(self._local_collection)
-            # Update cache, if resume enabled
-            with self._state_lock:
-                # Get the manifest text without comitting pending blocks
-                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-            self._save_state()
+            if self.use_cache:
+                # Update cache
+                with self._state_lock:
+                    if final:
+                        self._state['manifest'] = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                self._save_state()
         # Call the reporter, if any
         self.report_progress()
 
@@ -513,14 +539,10 @@ class ArvPutUploadJob(object):
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
@@ -543,30 +565,35 @@ class ArvPutUploadJob(object):
             # Collection locator provided, but unknown format
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
-        # Set up cache file name from input paths.
-        md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-        realpaths = sorted(os.path.realpath(path) for path in self.paths)
-        md5.update('\0'.join(realpaths))
-        if self.filename:
-            md5.update(self.filename)
-        cache_filename = md5.hexdigest()
-        self._cache_file = open(os.path.join(
-            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-            cache_filename), 'a+')
-        self._cache_filename = self._cache_file.name
-        self._lock_file(self._cache_file)
-        self._cache_file.seek(0)
+        if self.use_cache:
+            # Set up cache file name from input paths.
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            self._cache_file = open(os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+
         with self._state_lock:
-            try:
-                self._state = json.load(self._cache_file)
-                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                    # Cache at least partially incomplete, set up new cache
+            if self.use_cache:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            except ValueError:
-                # Cache file empty, set up new cache
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
-
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
         # Load how many bytes were uploaded on previous run
@@ -585,7 +612,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -738,6 +765,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
@@ -750,7 +778,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."])
         sys.exit(1)
     except CollectionUpdateError as error:
         print >>stderr, "\n".join([