10383: Merge branch 'master' into 10383-arv-put-incremental-upload
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 22:30:02 +0000 (19:30 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 22:30:02 +0000 (19:30 -0300)
1  2 
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index eadb3a9bd1638cb9b384c3592b4790fe1f97e3bd,517d617d8c4f8403953b5d0b105808e0bd18ac0d..4cc2591ebb25034d0145de40c11f6638e3973864
@@@ -516,7 -516,7 +516,7 @@@ class _BlockManager(object)
                      return
                  self._keep.get(b)
              except Exception:
-                 pass
+                 _logger.exception("Exception doing block prefetch")
  
      @synchronized
      def start_get_threads(self):
@@@ -759,14 -759,6 +759,14 @@@ class ArvadosFile(object)
      def writable(self):
          return self.parent.writable()
  
 +    @synchronized
 +    def permission_expired(self, as_of_dt=None):
 +        """Returns True if any of the segment's locators is expired"""
 +        for r in self._segments:
 +            if KeepLocator(r.locator).permission_expired(as_of_dt):
 +                return True
 +        return False
 +
      @synchronized
      def segments(self):
          return copy.copy(self._segments)
index 88956cdce69696f91730fd0eb2adc5430c9b3a08,e3b41b26d370abbd37210ae477ea25002b66c781..88d5a79d48ff867573afec933fb5f1fb561ce318
@@@ -7,23 -7,23 +7,24 @@@ import argpars
  import arvados
  import arvados.collection
  import base64
 +import copy
  import datetime
  import errno
  import fcntl
  import hashlib
  import json
 +import logging
  import os
  import pwd
 -import time
 +import re
  import signal
  import socket
  import sys
  import tempfile
  import threading
 -import copy
 -import logging
 +import time
  from apiclient import errors as apiclient_errors
+ from arvados._version import __version__
  
  import arvados.commands._util as arv_cmd
  
@@@ -32,6 -32,9 +33,9 @@@ api_client = Non
  
  upload_opts = argparse.ArgumentParser(add_help=False)
  
+ upload_opts.add_argument('--version', action='version',
+                          version="%s %s" % (sys.argv[0], __version__),
+                          help='Print version and exit.')
  upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                           help="""
  Local file or directory. Default: read from standard input.
  _group = upload_opts.add_mutually_exclusive_group()
  
  _group.add_argument('--max-manifest-depth', type=int, metavar='N',
 -                    default=-1, help="""
 -Maximum depth of directory tree to represent in the manifest
 -structure. A directory structure deeper than this will be represented
 -as a single stream in the manifest. If N=0, the manifest will contain
 -a single stream. Default: -1 (unlimited), i.e., exactly one manifest
 -stream per filesystem directory that contains files.
 -""")
 +                    default=-1, help=argparse.SUPPRESS)
  
  _group.add_argument('--normalize', action='store_true',
                      help="""
@@@ -91,12 -100,6 +95,12 @@@ separated by commas, with a trailing ne
  manifest.
  """)
  
 +upload_opts.add_argument('--update-collection', type=str, default=None,
 +                         dest='update_collection', metavar="UUID", help="""
 +Update an existing collection identified by the given Arvados collection
 +UUID. All new local files will be uploaded.
 +""")
 +
  upload_opts.add_argument('--use-filename', type=str, default=None,
                           dest='filename', help="""
  Synonym for --filename.
@@@ -164,16 -167,6 +168,16 @@@ _group.add_argument('--no-resume', acti
  Do not continue interrupted uploads from cached state.
  """)
  
 +_group = run_opts.add_mutually_exclusive_group()
 +_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
 +                    help="""
 +Save upload state in a cache file for resuming (default).
 +""")
 +_group.add_argument('--no-cache', action='store_false', dest='use_cache',
 +                    help="""
 +Do not save upload state in a cache file for resuming.
 +""")
 +
  arg_parser = argparse.ArgumentParser(
      description='Copy data from the local filesystem to Keep.',
      parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@@ -198,32 -191,16 +202,32 @@@ def parse_arguments(arguments)
          and os.isatty(sys.stderr.fileno())):
          args.progress = True
  
 +    # Turn off --resume (default) if --no-cache is used.
 +    if not args.use_cache:
 +        args.resume = False
 +
      if args.paths == ['-']:
 +        if args.update_collection:
 +            arg_parser.error("""
 +    --update-collection cannot be used when reading from stdin.
 +    """)
          args.resume = False
 +        args.use_cache = False
          if not args.filename:
              args.filename = 'stdin'
  
      return args
  
 +
 +class CollectionUpdateError(Exception):
 +    pass
 +
 +
  class ResumeCacheConflict(Exception):
      pass
  
 +class ArvPutArgumentConflict(Exception):
 +    pass
  
  class ResumeCache(object):
      CACHE_DIR = '.cache/arvados/arv-put'
          realpaths = sorted(os.path.realpath(path) for path in args.paths)
          md5.update('\0'.join(realpaths))
          if any(os.path.isdir(path) for path in realpaths):
 -            md5.update(str(max(args.max_manifest_depth, -1)))
 +            md5.update("-1")
          elif args.filename:
              md5.update(args.filename)
          return os.path.join(
@@@ -314,14 -291,12 +318,14 @@@ class ArvPutUploadJob(object)
          'files' : {} # Previous run file list: {path : {size, mtime}}
      }
  
 -    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
 -                 name=None, owner_uuid=None, ensure_unique_name=False,
 -                 num_retries=None, replication_desired=None,
 -                 filename=None, update_time=1.0):
 +    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
 +                 bytes_expected=None, name=None, owner_uuid=None,
 +                 ensure_unique_name=False, num_retries=None, replication_desired=None,
 +                 filename=None, update_time=20.0, update_collection=None):
          self.paths = paths
          self.resume = resume
 +        self.use_cache = use_cache
 +        self.update = False
          self.reporter = reporter
          self.bytes_expected = bytes_expected
          self.bytes_written = 0
          self._state = None # Previous run state (file list & manifest)
          self._current_files = [] # Current run file list
          self._cache_file = None
 -        self._collection = None
          self._collection_lock = threading.Lock()
 +        self._remote_collection = None # Collection being updated (if asked)
 +        self._local_collection = None # Collection from previous run manifest
 +        self._file_paths = [] # Files to be updated in remote collection
          self._stop_checkpointer = threading.Event()
          self._checkpointer = threading.Thread(target=self._update_task)
          self._update_task_time = update_time  # How many seconds wait between update runs
 +        self._files_to_upload = []
          self.logger = logging.getLogger('arvados.arv_put')
 +
 +        if not self.use_cache and self.resume:
 +            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 +
          # Load cached data if any and if needed
 -        self._setup_state()
 +        self._setup_state(update_collection)
  
 -    def start(self):
 +    def start(self, save_collection):
          """
          Start supporting thread & file uploading
          """
                  if path == '-':
                      self._write_stdin(self.filename or 'stdin')
                  elif os.path.isdir(path):
 -                    self._write_directory_tree(path)
 +                    # Use absolute paths on cache index so CWD doesn't interfere
 +                    # with the caching logic.
 +                    prefixdir = path = os.path.abspath(path)
 +                    if prefixdir != '/':
 +                        prefixdir += '/'
 +                    for root, dirs, files in os.walk(path):
 +                        # Make os.walk()'s dir traversing order deterministic
 +                        dirs.sort()
 +                        files.sort()
 +                        for f in files:
 +                            self._check_file(os.path.join(root, f),
 +                                             os.path.join(root[len(prefixdir):], f))
                  else:
 -                    self._write_file(path, self.filename or os.path.basename(path))
 +                    self._check_file(os.path.abspath(path),
 +                                     self.filename or os.path.basename(path))
 +            # Update bytes_written from current local collection and
 +            # report initial progress.
 +            self._update()
 +            # Actual file upload
 +            self._upload_files()
          finally:
              # Stop the thread before doing anything else
              self._stop_checkpointer.set()
              self._checkpointer.join()
 -            # Commit all & one last _update()
 -            self.manifest_text()
 -            self._update()
 -            if self.resume:
 +            # Commit all pending blocks & one last _update()
 +            self._local_collection.manifest_text()
 +            self._update(final=True)
 +            if self.use_cache:
                  self._cache_file.close()
 -                # Correct the final written bytes count
 -                self.bytes_written -= self.bytes_skipped
 +            if save_collection:
 +                self.save_collection()
  
      def save_collection(self):
 -        with self._collection_lock:
 -            self._my_collection().save_new(
 +        if self.update:
 +            # Check if files should be updated on the remote collection.
 +            for fp in self._file_paths:
 +                remote_file = self._remote_collection.find(fp)
 +                if not remote_file:
 +                    # File don't exist on remote collection, copy it.
 +                    self._remote_collection.copy(fp, fp, self._local_collection)
 +                elif remote_file != self._local_collection.find(fp):
 +                    # A different file exist on remote collection, overwrite it.
 +                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
 +                else:
 +                    # The file already exist on remote collection, skip it.
 +                    pass
 +            self._remote_collection.save(num_retries=self.num_retries)
 +        else:
 +            self._local_collection.save_new(
                  name=self.name, owner_uuid=self.owner_uuid,
                  ensure_unique_name=self.ensure_unique_name,
                  num_retries=self.num_retries)
          while not self._stop_checkpointer.wait(self._update_task_time):
              self._update()
  
 -    def _update(self):
 +    def _update(self, final=False):
          """
          Update cached manifest text and report progress.
          """
          with self._collection_lock:
 -            self.bytes_written = self._collection_size(self._my_collection())
 -            # Update cache, if resume enabled
 -            if self.resume:
 +            self.bytes_written = self._collection_size(self._local_collection)
 +            if self.use_cache:
 +                # Update cache
                  with self._state_lock:
 -                    # Get the manifest text without comitting pending blocks
 -                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
 +                    if final:
 +                        self._state['manifest'] = self._local_collection.manifest_text()
 +                    else:
 +                        # Get the manifest text without comitting pending blocks
 +                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
                  self._save_state()
          # Call the reporter, if any
          self.report_progress()
          if self.reporter is not None:
              self.reporter(self.bytes_written, self.bytes_expected)
  
 -    def _write_directory_tree(self, path, stream_name="."):
 -        # TODO: Check what happens when multiple directories are passed as
 -        # arguments.
 -        # If the code below is uncommented, integration test
 -        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
 -        # fails, I suppose it is because the manifest_uuid changes because
 -        # of the dir addition to stream_name.
 -
 -        # if stream_name == '.':
 -        #     stream_name = os.path.join('.', os.path.basename(path))
 -        for item in os.listdir(path):
 -            if os.path.isdir(os.path.join(path, item)):
 -                self._write_directory_tree(os.path.join(path, item),
 -                                os.path.join(stream_name, item))
 -            else:
 -                self._write_file(os.path.join(path, item),
 -                                os.path.join(stream_name, item))
 -
      def _write_stdin(self, filename):
 -        with self._collection_lock:
 -            output = self._my_collection().open(filename, 'w')
 +        output = self._local_collection.open(filename, 'w')
          self._write(sys.stdin, output)
          output.close()
  
 -    def _write_file(self, source, filename):
 +    def _check_file(self, source, filename):
 +        """Check if this file needs to be uploaded"""
          resume_offset = 0
 -        if self.resume:
 -            # Check if file was already uploaded (at least partially)
 -            with self._collection_lock:
 -                try:
 -                    file_in_collection = self._my_collection().find(filename)
 -                except IOError:
 -                    # Not found
 -                    file_in_collection = None
 +        should_upload = False
 +        new_file_in_cache = False
 +        # Record file path for updating the remote collection before exiting
 +        self._file_paths.append(filename)
 +
 +        with self._state_lock:
              # If no previous cached data on this file, store it for an eventual
              # repeated run.
              if source not in self._state['files']:
 -                with self._state_lock:
 -                    self._state['files'][source] = {
 -                        'mtime': os.path.getmtime(source),
 -                        'size' : os.path.getsize(source)
 -                    }
 -            with self._state_lock:
 -                cached_file_data = self._state['files'][source]
 -            # See if this file was already uploaded at least partially
 -            if file_in_collection:
 -                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 -                    if cached_file_data['size'] == file_in_collection.size():
 -                        # File already there, skip it.
 -                        self.bytes_skipped += cached_file_data['size']
 -                        return
 -                    elif cached_file_data['size'] > file_in_collection.size():
 -                        # File partially uploaded, resume!
 -                        resume_offset = file_in_collection.size()
 -                    else:
 -                        # Inconsistent cache, re-upload the file
 -                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 -                else:
 -                    # Local file differs from cached data, re-upload it
 -                    pass
 -        with open(source, 'r') as source_fd:
 -            if resume_offset > 0:
 -                # Start upload where we left off
 -                with self._collection_lock:
 -                    output = self._my_collection().open(filename, 'a')
 -                source_fd.seek(resume_offset)
 +                self._state['files'][source] = {
 +                    'mtime': os.path.getmtime(source),
 +                    'size' : os.path.getsize(source)
 +                }
 +                new_file_in_cache = True
 +            cached_file_data = self._state['files'][source]
 +
 +        # Check if file was already uploaded (at least partially)
 +        file_in_local_collection = self._local_collection.find(filename)
 +
 +        # If not resuming, upload the full file.
 +        if not self.resume:
 +            should_upload = True
 +        # New file detected from last run, upload it.
 +        elif new_file_in_cache:
 +            should_upload = True
 +        # Local file didn't change from last run.
 +        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 +            if not file_in_local_collection:
 +                # File not uploaded yet, upload it completely
 +                should_upload = True
 +            elif file_in_local_collection.permission_expired():
 +                # Permission token expired, re-upload file. This will change whenever
 +                # we have a API for refreshing tokens.
 +                should_upload = True
 +                self._local_collection.remove(filename)
 +            elif cached_file_data['size'] == file_in_local_collection.size():
 +                # File already there, skip it.
 +                self.bytes_skipped += cached_file_data['size']
 +            elif cached_file_data['size'] > file_in_local_collection.size():
 +                # File partially uploaded, resume!
 +                resume_offset = file_in_local_collection.size()
                  self.bytes_skipped += resume_offset
 +                should_upload = True
              else:
 -                # Start from scratch
 -                with self._collection_lock:
 -                    output = self._my_collection().open(filename, 'w')
 -            self._write(source_fd, output)
 -            output.close(flush=False)
 +                # Inconsistent cache, re-upload the file
 +                should_upload = True
 +                self._local_collection.remove(filename)
 +                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 +        # Local file differs from cached data, re-upload it.
 +        else:
 +            if file_in_local_collection:
 +                self._local_collection.remove(filename)
 +            should_upload = True
 +
 +        if should_upload:
 +            self._files_to_upload.append((source, resume_offset, filename))
 +
 +    def _upload_files(self):
 +        for source, resume_offset, filename in self._files_to_upload:
 +            with open(source, 'r') as source_fd:
 +                with self._state_lock:
 +                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
 +                    self._state['files'][source]['size'] = os.path.getsize(source)
 +                if resume_offset > 0:
 +                    # Start upload where we left off
 +                    output = self._local_collection.open(filename, 'a')
 +                    source_fd.seek(resume_offset)
 +                else:
 +                    # Start from scratch
 +                    output = self._local_collection.open(filename, 'w')
 +                self._write(source_fd, output)
 +                output.close(flush=False)
  
      def _write(self, source_fd, output):
 -        first_read = True
          while True:
              data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
 -            # Allow an empty file to be written
 -            if not data and not first_read:
 +            if not data:
                  break
 -            if first_read:
 -                first_read = False
              output.write(data)
  
      def _my_collection(self):
 -        """
 -        Create a new collection if none cached. Load it from cache otherwise.
 -        """
 -        if self._collection is None:
 -            with self._state_lock:
 -                manifest = self._state['manifest']
 -            if self.resume and manifest is not None:
 -                # Create collection from saved state
 -                self._collection = arvados.collection.Collection(
 -                    manifest,
 -                    replication_desired=self.replication_desired)
 -            else:
 -                # Create new collection
 -                self._collection = arvados.collection.Collection(
 -                    replication_desired=self.replication_desired)
 -        return self._collection
 +        return self._remote_collection if self.update else self._local_collection
  
 -    def _setup_state(self):
 +    def _setup_state(self, update_collection):
          """
          Create a new cache file or load a previously existing one.
          """
 -        if self.resume:
 +        # Load an already existing collection for update
 +        if update_collection and re.match(arvados.util.collection_uuid_pattern,
 +                                          update_collection):
 +            try:
 +                self._remote_collection = arvados.collection.Collection(update_collection)
 +            except arvados.errors.ApiError as error:
 +                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
 +            else:
 +                self.update = True
 +        elif update_collection:
 +            # Collection locator provided, but unknown format
 +            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 +
 +        if self.use_cache:
 +            # Set up cache file name from input paths.
              md5 = hashlib.md5()
              md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
              realpaths = sorted(os.path.realpath(path) for path in self.paths)
              self._cache_filename = self._cache_file.name
              self._lock_file(self._cache_file)
              self._cache_file.seek(0)
 -            with self._state_lock:
 +
 +        with self._state_lock:
 +            if self.use_cache:
                  try:
                      self._state = json.load(self._cache_file)
                      if not set(['manifest', 'files']).issubset(set(self._state.keys())):
                  except ValueError:
                      # Cache file empty, set up new cache
                      self._state = copy.deepcopy(self.EMPTY_STATE)
 -            # Load how many bytes were uploaded on previous run
 -            with self._collection_lock:
 -                self.bytes_written = self._collection_size(self._my_collection())
 -        # No resume required
 -        else:
 -            with self._state_lock:
 +            else:
 +                # No cache file, set empty state
                  self._state = copy.deepcopy(self.EMPTY_STATE)
 +            # Load the previous manifest so we can check if files were modified remotely.
 +            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
  
      def _lock_file(self, fileobj):
          try:
          """
          try:
              with self._state_lock:
 -                state = self._state
 +                state = copy.deepcopy(self._state)
              new_cache_fd, new_cache_name = tempfile.mkstemp(
                  dir=os.path.dirname(self._cache_filename))
              self._lock_file(new_cache_fd)
              self._cache_file = new_cache
  
      def collection_name(self):
 -        with self._collection_lock:
 -            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 -        return name
 +        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
  
      def manifest_locator(self):
 -        with self._collection_lock:
 -            locator = self._my_collection().manifest_locator()
 -        return locator
 +        return self._my_collection().manifest_locator()
  
      def portable_data_hash(self):
 -        with self._collection_lock:
 -            datahash = self._my_collection().portable_data_hash()
 -        return datahash
 +        return self._my_collection().portable_data_hash()
  
      def manifest_text(self, stream_name=".", strip=False, normalize=False):
 -        with self._collection_lock:
 -            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
 -        return manifest
 +        return self._my_collection().manifest_text(stream_name, strip, normalize)
  
      def _datablocks_on_item(self, item):
          """
@@@ -770,11 -710,9 +774,11 @@@ def main(arguments=None, stdout=sys.std
          reporter = None
  
      bytes_expected = expected_bytes_for(args.paths)
 +
      try:
          writer = ArvPutUploadJob(paths = args.paths,
                                   resume = args.resume,
 +                                 use_cache = args.use_cache,
                                   filename = args.filename,
                                   reporter = reporter,
                                   bytes_expected = bytes_expected,
                                   replication_desired = args.replication,
                                   name = collection_name,
                                   owner_uuid = project_uuid,
 -                                 ensure_unique_name = True)
 +                                 ensure_unique_name = True,
 +                                 update_collection = args.update_collection)
      except ResumeCacheConflict:
          print >>stderr, "\n".join([
              "arv-put: Another process is already uploading this data.",
 -            "         Use --no-resume if this is really what you want."])
 +            "         Use --no-cache if this is really what you want."])
 +        sys.exit(1)
 +    except CollectionUpdateError as error:
 +        print >>stderr, "\n".join([
 +            "arv-put: %s" % str(error)])
          sys.exit(1)
  
      # Install our signal handler for each code in CAUGHT_SIGNALS, and save
      orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                              for sigcode in CAUGHT_SIGNALS}
  
 -    if args.resume and writer.bytes_written > 0:
 +    if not args.update_collection and args.resume and writer.bytes_written > 0:
          print >>stderr, "\n".join([
                  "arv-put: Resuming previous upload from last checkpoint.",
                  "         Use the --no-resume option to start over."])
  
      writer.report_progress()
      output = None
 -    writer.start()
 +    try:
 +        writer.start(save_collection=not(args.stream or args.raw))
 +    except arvados.errors.ApiError as error:
 +        print >>stderr, "\n".join([
 +            "arv-put: %s" % str(error)])
 +        sys.exit(1)
 +
      if args.progress:  # Print newline to split stderr from stdout for humans.
          print >>stderr
  
          output = ','.join(writer.data_locators())
      else:
          try:
 -            writer.save_collection()
 -            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
 +            if args.update_collection:
 +                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
 +            else:
 +                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
              if args.portable_data_hash:
                  output = writer.portable_data_hash()
              else:
          sys.exit(status)
  
      # Success!
 -    writer.destroy_cache()
      return output
  
  
index 0c1d3779fbaae8badd19809a275329e57fdc4a3c,f35e4c725c1ebfb74062b44b4c4c4d5477684f9a..bc933e27f8de55d7611865bf10d92993a54937f1
@@@ -2,6 -2,7 +2,7 @@@
  # -*- coding: utf-8 -*-
  
  import apiclient
+ import io
  import mock
  import os
  import pwd
@@@ -31,7 -32,9 +32,7 @@@ class ArvadosPutResumeCacheTest(Arvados
          [],
          ['/dev/null'],
          ['/dev/null', '--filename', 'empty'],
 -        ['/tmp'],
 -        ['/tmp', '--max-manifest-depth', '0'],
 -        ['/tmp', '--max-manifest-depth', '1']
 +        ['/tmp']
          ]
  
      def tearDown(self):
  
  class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                            ArvadosBaseTestCase):
 +
      def setUp(self):
          super(ArvPutUploadJobTest, self).setUp()
          run_test_server.authorize_with('active')
  
      def test_writer_works_without_cache(self):
          cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
 -        cwriter.start()
 +        cwriter.start(save_collection=False)
          self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
  
      def test_writer_works_with_cache(self):
              f.write('foo')
              f.flush()
              cwriter = arv_put.ArvPutUploadJob([f.name])
 -            cwriter.start()
 -            self.assertEqual(3, cwriter.bytes_written)
 +            cwriter.start(save_collection=False)
 +            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
              # Don't destroy the cache, and start another upload
              cwriter_new = arv_put.ArvPutUploadJob([f.name])
 -            cwriter_new.start()
 +            cwriter_new.start(save_collection=False)
              cwriter_new.destroy_cache()
 -            self.assertEqual(0, cwriter_new.bytes_written)
 +            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
  
      def make_progress_tester(self):
          progression = []
                  progression, reporter = self.make_progress_tester()
                  cwriter = arv_put.ArvPutUploadJob([f.name],
                      reporter=reporter, bytes_expected=expect_count)
 -                cwriter.start()
 +                cwriter.start(save_collection=False)
                  cwriter.destroy_cache()
                  self.assertIn((3, expect_count), progression)
  
      def test_writer_upload_directory(self):
          cwriter = arv_put.ArvPutUploadJob([self.tempdir])
 -        cwriter.start()
 +        cwriter.start(save_collection=False)
          cwriter.destroy_cache()
          self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
  
              writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                               replication_desired=1)
              with self.assertRaises(SystemExit):
 -                writer.start()
 -                self.assertLess(writer.bytes_written,
 -                                os.path.getsize(self.large_file_name))
 +                writer.start(save_collection=False)
 +            # Confirm that the file was partially uploaded
 +            self.assertGreater(writer.bytes_written, 0)
 +            self.assertLess(writer.bytes_written,
 +                            os.path.getsize(self.large_file_name))
          # Retry the upload
          writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                            replication_desired=1)
 -        writer2.start()
 -        self.assertEqual(writer.bytes_written + writer2.bytes_written,
 +        writer2.start(save_collection=False)
 +        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
 +                         os.path.getsize(self.large_file_name))
 +        writer2.destroy_cache()
 +
 +    def test_no_resume_when_asked(self):
 +        def wrapped_write(*args, **kwargs):
 +            data = args[1]
 +            # Exit only on last block
 +            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
 +                raise SystemExit("Simulated error")
 +            return self.arvfile_write(*args, **kwargs)
 +
 +        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
 +                        autospec=True) as mocked_write:
 +            mocked_write.side_effect = wrapped_write
 +            writer = arv_put.ArvPutUploadJob([self.large_file_name],
 +                                             replication_desired=1)
 +            with self.assertRaises(SystemExit):
 +                writer.start(save_collection=False)
 +            # Confirm that the file was partially uploaded
 +            self.assertGreater(writer.bytes_written, 0)
 +            self.assertLess(writer.bytes_written,
 +                            os.path.getsize(self.large_file_name))
 +        # Retry the upload, this time without resume
 +        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
 +                                          replication_desired=1,
 +                                          resume=False)
 +        writer2.start(save_collection=False)
 +        self.assertEqual(writer2.bytes_skipped, 0)
 +        self.assertEqual(writer2.bytes_written,
 +                         os.path.getsize(self.large_file_name))
 +        writer2.destroy_cache()
 +
 +    def test_no_resume_when_no_cache(self):
 +        def wrapped_write(*args, **kwargs):
 +            data = args[1]
 +            # Exit only on last block
 +            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
 +                raise SystemExit("Simulated error")
 +            return self.arvfile_write(*args, **kwargs)
 +
 +        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
 +                        autospec=True) as mocked_write:
 +            mocked_write.side_effect = wrapped_write
 +            writer = arv_put.ArvPutUploadJob([self.large_file_name],
 +                                             replication_desired=1)
 +            with self.assertRaises(SystemExit):
 +                writer.start(save_collection=False)
 +            # Confirm that the file was partially uploaded
 +            self.assertGreater(writer.bytes_written, 0)
 +            self.assertLess(writer.bytes_written,
 +                            os.path.getsize(self.large_file_name))
 +        # Retry the upload, this time without cache usage
 +        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
 +                                          replication_desired=1,
 +                                          resume=False,
 +                                          use_cache=False)
 +        writer2.start(save_collection=False)
 +        self.assertEqual(writer2.bytes_skipped, 0)
 +        self.assertEqual(writer2.bytes_written,
                           os.path.getsize(self.large_file_name))
          writer2.destroy_cache()
  
@@@ -468,6 -409,15 +469,15 @@@ class ArvadosPutTest(run_test_server.Te
                  delattr(self, outbuf)
          super(ArvadosPutTest, self).tearDown()
  
+     def test_version_argument(self):
+         err = io.BytesIO()
+         out = io.BytesIO()
+         with tutil.redirected_streams(stdout=out, stderr=err):
+             with self.assertRaises(SystemExit):
+                 self.call_main_with_args(['--version'])
+         self.assertEqual(out.getvalue(), '')
+         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
      def test_simple_file_put(self):
          self.call_main_on_test_file()
  
@@@ -684,21 -634,6 +694,21 @@@ class ArvPutIntegrationTest(run_test_se
          self.assertEqual(1, len(collection_list))
          return collection_list[0]
  
 +    def test_put_collection_with_later_update(self):
 +        tmpdir = self.make_tmpdir()
 +        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
 +            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
 +        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
 +        self.assertNotEqual(None, col['uuid'])
 +        # Add a new file to the directory
 +        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
 +            f.write('The quick brown fox jumped over the lazy dog')
 +        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
 +        self.assertEqual(col['uuid'], updated_col['uuid'])
 +        # Get the manifest and check that the new file is being included
 +        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
 +        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
 +
      def test_put_collection_with_high_redundancy(self):
          # Write empty data: we're not testing CollectionWriter, just
          # making sure collections.create tells the API server what our