10383: Merge branch 'master' into 10383-arv-put-incremental-upload
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 22:30:02 +0000 (19:30 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 22:30:02 +0000 (19:30 -0300)
sdk/cli/test/test_arv-keep-put.rb
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py

index fefbc2729875e70cb890f69d56fe1d7f1c614b8d..e6ead25b807e70eb03d293bc9d3a92aad2a78c7b 100644 (file)
@@ -40,7 +40,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_raw_file
     out, err = capture_subprocess_io do
-      assert arv_put('--raw', './tmp/foo')
+      assert arv_put('--no-cache', '--raw', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -87,7 +87,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_as_stream
     out, err = capture_subprocess_io do
-      assert arv_put('--as-stream', './tmp/foo')
+      assert arv_put('--no-cache', '--as-stream', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -96,7 +96,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_progress
     out, err = capture_subprocess_io do
-      assert arv_put('--manifest', '--progress', './tmp/foo')
+      assert arv_put('--no-cache', '--manifest', '--progress', './tmp/foo')
     end
     assert_match /%/, err
     assert match_collection_uuid(out)
@@ -104,7 +104,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_batch_progress
     out, err = capture_subprocess_io do
-      assert arv_put('--manifest', '--batch-progress', './tmp/foo')
+      assert arv_put('--no-cache', '--manifest', '--batch-progress', './tmp/foo')
     end
     assert_match /: 0 written 3 total/, err
     assert_match /: 3 written 3 total/, err
index 517d617d8c4f8403953b5d0b105808e0bd18ac0d..4cc2591ebb25034d0145de40c11f6638e3973864 100644 (file)
@@ -759,6 +759,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
index 27aad033ae55523de4fd14ae7bf9cf8a7d2b654f..812438e2ccf493507d06ca9468e3c9418f9e0e69 100644 (file)
@@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -829,7 +836,7 @@ class RichCollectionBase(CollectionBase):
         if target_dir is None:
             raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
index e3b41b26d370abbd37210ae477ea25002b66c781..88d5a79d48ff867573afec933fb5f1fb561ce318 100644 (file)
@@ -7,21 +7,22 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
@@ -43,13 +44,7 @@ Local file or directory. Default: read from standard input.
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -100,6 +95,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -167,6 +168,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -191,16 +202,32 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
+class ArvPutArgumentConflict(Exception):
+    pass
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
@@ -217,7 +244,7 @@ class ResumeCache(object):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update("-1")
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
@@ -291,12 +318,14 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None, replication_desired=None,
+                 filename=None, update_time=20.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -311,16 +340,23 @@ class ArvPutUploadJob(object):
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self._files_to_upload = []
         self.logger = logging.getLogger('arvados.arv_put')
+
+        if not self.use_cache and self.resume:
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
-    def start(self):
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
@@ -332,24 +368,55 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
+                    for root, dirs, files in os.walk(path):
+                        # Make os.walk()'s dir traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for f in files:
+                            self._check_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
+                    self._check_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
+            # Update bytes_written from current local collection and
+            # report initial progress.
+            self._update()
+            # Actual file upload
+            self._upload_files()
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
-            self._update()
-            if self.resume:
+            # Commit all pending blocks & one last _update()
+            self._local_collection.manifest_text()
+            self._update(final=True)
+            if self.use_cache:
                 self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
+            if save_collection:
+                self.save_collection()
 
     def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
@@ -384,17 +451,20 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
-            # Update cache, if resume enabled
-            if self.resume:
+            self.bytes_written = self._collection_size(self._local_collection)
+            if self.use_cache:
+                # Update cache
                 with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                    if final:
+                        self._state['manifest'] = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
                 self._save_state()
         # Call the reporter, if any
         self.report_progress()
@@ -403,114 +473,116 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
+        should_upload = False
+        new_file_in_cache = False
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.append(filename)
+
+        with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
-                with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
-                else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
+                should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
+                self._local_collection.remove(filename)
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
                 self.bytes_skipped += resume_offset
+                should_upload = True
             else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close(flush=False)
+                # Inconsistent cache, re-upload the file
+                should_upload = True
+                self._local_collection.remove(filename)
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
+            should_upload = True
+
+        if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
+            with open(source, 'r') as source_fd:
+                with self._state_lock:
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    output = self._local_collection.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                else:
+                    # Start from scratch
+                    output = self._local_collection.open(filename, 'w')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._remote_collection if self.update else self._local_collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        if self.use_cache:
+            # Set up cache file name from input paths.
             md5 = hashlib.md5()
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
@@ -524,7 +596,9 @@ class ArvPutUploadJob(object):
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
             self._cache_file.seek(0)
-            with self._state_lock:
+
+        with self._state_lock:
+            if self.use_cache:
                 try:
                     self._state = json.load(self._cache_file)
                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
@@ -533,13 +607,11 @@ class ArvPutUploadJob(object):
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
 
     def _lock_file(self, fileobj):
         try:
@@ -553,7 +625,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -573,24 +645,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
@@ -710,9 +774,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = None
 
     bytes_expected = expected_bytes_for(args.paths)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
@@ -720,11 +786,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  replication_desired = args.replication,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."])
+        sys.exit(1)
+    except CollectionUpdateError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
         sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
@@ -732,14 +803,20 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
+    if not args.update_collection and args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
@@ -752,8 +829,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+            else:
+                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
@@ -779,7 +858,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
     return output
 
 
index f35e4c725c1ebfb74062b44b4c4c4d5477684f9a..bc933e27f8de55d7611865bf10d92993a54937f1 100644 (file)
@@ -32,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         [],
         ['/dev/null'],
         ['/dev/null', '--filename', 'empty'],
-        ['/tmp'],
-        ['/tmp', '--max-manifest-depth', '0'],
-        ['/tmp', '--max-manifest-depth', '1']
+        ['/tmp']
         ]
 
     def tearDown(self):
@@ -241,6 +239,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                           ArvadosBaseTestCase):
+
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
@@ -271,7 +270,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter.start(save_collection=False)
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_with_cache(self):
@@ -279,13 +278,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.write('foo')
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
-            self.assertEqual(3, cwriter.bytes_written)
+            cwriter.start(save_collection=False)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
+            cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -301,13 +300,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
                     reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
+                cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
+        cwriter.start(save_collection=False)
         cwriter.destroy_cache()
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
@@ -325,14 +324,75 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
-                writer.start()
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
-        writer2.start()
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+        writer2.start(save_collection=False)
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_asked(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without resume
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_no_cache(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without cache usage
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False,
+                                          use_cache=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
 
@@ -634,6 +694,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
     def test_put_collection_with_high_redundancy(self):
         # Write empty data: we're not testing CollectionWriter, just
         # making sure collections.create tells the API server what our
index 6b3562602aa69601021b04d93a116c04972abab5..8f02d517fc54ff531755dbacdefdb48378ab13ea 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import bz2
+import datetime
 import gzip
 import io
 import mock
@@ -570,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
 
+
+class ArvadosFileTestCase(unittest.TestCase):
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
+    def test_permission_expired(self):
+        base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n"
+        now = datetime.datetime.now()
+        a_week_ago = now - datetime.timedelta(days=7)
+        a_month_ago = now - datetime.timedelta(days=30)
+        a_week_from_now = now + datetime.timedelta(days=7)
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c:
+            self.assertFalse(c.find('count.txt').permission_expired())
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c:
+            f = c.find('count.txt')
+            self.assertTrue(f.permission_expired())
+            self.assertTrue(f.permission_expired(a_week_from_now))
+            self.assertFalse(f.permission_expired(a_month_ago))
+
+
 class BlockManagerTest(unittest.TestCase):
     def test_bufferblock_append(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
index fc30a242eba1bfc665a05747de66f999869ef8a4..0e3d5e13f135c84f2fde2f741bd554b0ccdf3a85 100644 (file)
@@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
             c.find("/.")
         with self.assertRaises(arvados.errors.ArgumentError):
             c.find("")
+        self.assertIs(c.find("./nonexistant.txt"), None)
+        self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
 
     def test_remove_in_subdir(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')