10383: Merge branch 'master' into 10383-arv-put-incremental-upload
authorLucas Di Pentima <lucas@curoverse.com>
Mon, 21 Nov 2016 22:28:14 +0000 (19:28 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Mon, 21 Nov 2016 22:28:14 +0000 (19:28 -0300)
sdk/cli/test/test_arv-keep-put.rb
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_collections.py

index fefbc2729875e70cb890f69d56fe1d7f1c614b8d..8769cde618acd1664ddb7a42a55d8aa4606f2874 100644 (file)
@@ -40,7 +40,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_raw_file
     out, err = capture_subprocess_io do
-      assert arv_put('--raw', './tmp/foo')
+      assert arv_put('--no-resume', '--raw', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -87,7 +87,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_as_stream
     out, err = capture_subprocess_io do
-      assert arv_put('--as-stream', './tmp/foo')
+      assert arv_put('--no-resume', '--as-stream', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -104,7 +104,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_batch_progress
     out, err = capture_subprocess_io do
-      assert arv_put('--manifest', '--batch-progress', './tmp/foo')
+      assert arv_put('--no-resume', '--manifest', '--batch-progress', './tmp/foo')
     end
     assert_match /: 0 written 3 total/, err
     assert_match /: 3 written 3 total/, err
index 27aad033ae55523de4fd14ae7bf9cf8a7d2b654f..812438e2ccf493507d06ca9468e3c9418f9e0e69 100644 (file)
@@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -829,7 +836,7 @@ class RichCollectionBase(CollectionBase):
         if target_dir is None:
             raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
index 34cef6725500e6cb5cd12ec317aa1be8eeb4bd80..c7de888bca88b03f6eeb0e8c504c94343a520069 100644 (file)
@@ -7,21 +7,22 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -96,6 +97,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -163,6 +170,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -187,13 +204,27 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
@@ -287,12 +318,14 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None, replication_desired=None,
+                 filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -307,16 +340,22 @@ class ArvPutUploadJob(object):
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
+
+        if not self.use_cache and self.resume:
+            raise ArgumentError('resume cannot be True when use_cache is False')
+
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
-    def start(self):
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
@@ -328,24 +367,52 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
+                    for root, dirs, files in os.walk(path):
+                        # Make os.walk()'s dir traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for f in files:
+                            self._write_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
+                    self._write_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
-            self._update()
-            if self.resume:
+            # Commit all pending blocks & one last _update()
+            self._local_collection.manifest_text()
+            self._update(final=True)
+            if self.use_cache:
                 self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
+            if save_collection:
+                self.save_collection()
+            # Correct the final written bytes count
+            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
@@ -380,17 +447,20 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
-            # Update cache, if resume enabled
-            if self.resume:
+            self.bytes_written = self._collection_size(self._local_collection)
+            if self.use_cache:
+                # Update cache
                 with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                    if final:
+                        self._state['manifest'] = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
                 self._save_state()
         # Call the reporter, if any
         self.report_progress()
@@ -399,114 +469,104 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
+        should_upload = False
+        new_file_in_cache = False
+
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.append(filename)
+
+        with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
+                should_upload = True
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
+                should_upload = True
+            else:
+                # Inconsistent cache, re-upload the file
+                should_upload = True
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            should_upload = True
+
+        if should_upload:
+            with open(source, 'r') as source_fd:
                 with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    output = self._local_collection.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                    self.bytes_skipped += resume_offset
                 else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
-                self.bytes_skipped += resume_offset
-            else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close(flush=False)
+                    # Start from scratch
+                    output = self._local_collection.open(filename, 'w')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._remote_collection if self.update else self._local_collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        if self.use_cache:
+            # Set up cache file name from input paths.
             md5 = hashlib.md5()
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
@@ -520,7 +580,9 @@ class ArvPutUploadJob(object):
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
             self._cache_file.seek(0)
-            with self._state_lock:
+
+        with self._state_lock:
+            if self.use_cache:
                 try:
                     self._state = json.load(self._cache_file)
                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
@@ -529,13 +591,14 @@ class ArvPutUploadJob(object):
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+        # Load how many bytes were uploaded on previous run
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._local_collection)
 
     def _lock_file(self, fileobj):
         try:
@@ -549,7 +612,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -569,24 +632,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
@@ -706,9 +761,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = None
 
     bytes_expected = expected_bytes_for(args.paths)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
@@ -716,11 +773,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  replication_desired = args.replication,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."])
+        sys.exit(1)
+    except CollectionUpdateError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
         sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
@@ -728,14 +790,20 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
+    if not args.update_collection and args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
@@ -748,8 +816,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+            else:
+                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -775,7 +845,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
     return output
 
 
index 7a0120c02814d00b27e81dd41fbb50e51ef2855c..e4b2724987841da7351fed0f8ea2827005e04cd2 100644 (file)
@@ -240,6 +240,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                           ArvadosBaseTestCase):
+
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
@@ -270,7 +271,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter.start(save_collection=False)
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_with_cache(self):
@@ -278,11 +279,11 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.write('foo')
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
+            cwriter.start(save_collection=False)
             self.assertEqual(3, cwriter.bytes_written)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
+            cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
             self.assertEqual(0, cwriter_new.bytes_written)
 
@@ -300,13 +301,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
                     reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
+                cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
+        cwriter.start(save_collection=False)
         cwriter.destroy_cache()
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
@@ -324,13 +325,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
-                writer.start()
+                writer.start(save_collection=False)
                 self.assertLess(writer.bytes_written,
                                 os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
-        writer2.start()
+        writer2.start(save_collection=False)
         self.assertEqual(writer.bytes_written + writer2.bytes_written,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
@@ -624,6 +625,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
     def test_put_collection_with_high_redundancy(self):
         # Write empty data: we're not testing CollectionWriter, just
         # making sure collections.create tells the API server what our
index fc30a242eba1bfc665a05747de66f999869ef8a4..0e3d5e13f135c84f2fde2f741bd554b0ccdf3a85 100644 (file)
@@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
             c.find("/.")
         with self.assertRaises(arvados.errors.ArgumentError):
             c.find("")
+        self.assertIs(c.find("./nonexistant.txt"), None)
+        self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
 
     def test_remove_in_subdir(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')