10383: Unused exception removed.
[arvados.git] / sdk / python / arvados / commands / put.py
index 34cef6725500e6cb5cd12ec317aa1be8eeb4bd80..b88fdc69c113ceb0e0fde13613f6a5c1b2400f53 100644 (file)
@@ -7,21 +7,22 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -96,6 +97,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -188,12 +195,21 @@ def parse_arguments(arguments):
         args.progress = True
 
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
@@ -290,9 +306,10 @@ class ArvPutUploadJob(object):
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
                  num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0):
+                 filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -307,16 +324,19 @@ class ArvPutUploadJob(object):
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
+
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
-    def start(self):
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
@@ -328,7 +348,17 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    if path == '.' or path == './' or os.path.dirname(path) == '':
+                        dirname = ''
+                    else:
+                        dirname = os.path.dirname(path) + '/'
+                    for root, dirs, files in os.walk(path):
+                        # Make os.walk()'s dir traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for f in files:
+                            self._write_file(os.path.join(root, f),
+                                             os.path.join(root[len(dirname):], f))
                 else:
                     self._write_file(path, self.filename or os.path.basename(path))
         finally:
@@ -337,15 +367,30 @@ class ArvPutUploadJob(object):
             self._checkpointer.join()
             # Commit all & one last _update()
             self.manifest_text()
+            if save_collection:
+                self.save_collection()
             self._update()
-            if self.resume:
-                self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
+            self._cache_file.close()
+            # Correct the final written bytes count
+            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
@@ -385,13 +430,12 @@ class ArvPutUploadJob(object):
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
+            self.bytes_written = self._collection_size(self._local_collection)
             # Update cache, if resume enabled
-            if self.resume:
-                with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-                self._save_state()
+            with self._state_lock:
+                # Get the manifest text without comitting pending blocks
+                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+            self._save_state()
         # Call the reporter, if any
         self.report_progress()
 
@@ -399,79 +443,74 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
+        should_upload = False
+        new_file_in_cache = False
+
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.append(filename)
+
+        with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
+                should_upload = True
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
+                should_upload = True
+            else:
+                # Inconsistent cache, re-upload the file
+                should_upload = True
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            should_upload = True
+
+        if should_upload:
+            with open(source, 'r') as source_fd:
                 with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    output = self._local_collection.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                    self.bytes_skipped += resume_offset
                 else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
-                self.bytes_skipped += resume_offset
-            else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close(flush=False)
+                    # Start from scratch
+                    output = self._local_collection.open(filename, 'w')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
         first_read = True
@@ -485,58 +524,55 @@ class ArvPutUploadJob(object):
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._remote_collection if self.update else self._local_collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
-            if self.filename:
-                md5.update(self.filename)
-            cache_filename = md5.hexdigest()
-            self._cache_file = open(os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename), 'a+')
-            self._cache_filename = self._cache_file.name
-            self._lock_file(self._cache_file)
-            self._cache_file.seek(0)
-            with self._state_lock:
-                try:
-                    self._state = json.load(self._cache_file)
-                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                        # Cache at least partially incomplete, set up new cache
-                        self._state = copy.deepcopy(self.EMPTY_STATE)
-                except ValueError:
-                    # Cache file empty, set up new cache
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        # Set up cache file name from input paths.
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        realpaths = sorted(os.path.realpath(path) for path in self.paths)
+        md5.update('\0'.join(realpaths))
+        if self.filename:
+            md5.update(self.filename)
+        cache_filename = md5.hexdigest()
+        self._cache_file = open(os.path.join(
+            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+            cache_filename), 'a+')
+        self._cache_filename = self._cache_file.name
+        self._lock_file(self._cache_file)
+        self._cache_file.seek(0)
+        with self._state_lock:
+            try:
+                self._state = json.load(self._cache_file)
+                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                    # Cache at least partially incomplete, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            except ValueError:
+                # Cache file empty, set up new cache
                 self._state = copy.deepcopy(self.EMPTY_STATE)
 
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+        # Load how many bytes were uploaded on previous run
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._local_collection)
+
     def _lock_file(self, fileobj):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
@@ -569,24 +605,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
@@ -706,6 +734,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = None
 
     bytes_expected = expected_bytes_for(args.paths)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
@@ -716,26 +745,37 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  replication_desired = args.replication,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
             "         Use --no-resume if this is really what you want."])
         sys.exit(1)
+    except CollectionUpdateError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
+    if not args.update_collection and args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
@@ -748,8 +788,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+            else:
+                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -775,7 +817,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
     return output