9701: Merge branch 'master' into 9701-collection-pack-small-files-alt
[arvados.git] / sdk / python / arvados / commands / put.py
index eb8199cad4254ca6c0d6b9c907b6d69a4c6c4e13..1a274104fc76c5ec6449dee5b8387fe3d478170b 100644 (file)
@@ -21,6 +21,7 @@ import sys
 import tempfile
 import threading
 import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -288,7 +289,7 @@ class ArvPutUploadJob(object):
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, write_copies=None, replication=None,
+                 num_retries=None, replication_desired=None,
                  filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
@@ -300,8 +301,7 @@ class ArvPutUploadJob(object):
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -312,6 +312,7 @@ class ArvPutUploadJob(object):
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
         # Load cached data if any and if needed
         self._setup_state()
 
@@ -319,6 +320,7 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
+        self._checkpointer.daemon = True
         self._checkpointer.start()
         try:
             for path in self.paths:
@@ -333,27 +335,28 @@ class ArvPutUploadJob(object):
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-        # Successful upload, one last _update()
-        self._update()
-        if self.resume:
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -386,7 +389,8 @@ class ArvPutUploadJob(object):
             # Update cache, if resume enabled
             if self.resume:
                 with self._state_lock:
-                    self._state['manifest'] = self._my_collection().manifest_text()
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
         if self.resume:
             self._save_state()
         # Call the reporter, if any
@@ -438,7 +442,8 @@ class ArvPutUploadJob(object):
                         'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
@@ -451,7 +456,7 @@ class ArvPutUploadJob(object):
                         resume_offset = file_in_collection.size()
                     else:
                         # Inconsistent cache, re-upload the file
-                        pass # TODO: log warning message
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
                 else:
                     # Local file differs from cached data, re-upload it
                     pass
@@ -486,12 +491,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -551,6 +556,7 @@ class ArvPutUploadJob(object):
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -685,19 +691,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -712,8 +705,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                 reporter = reporter,
                                 bytes_expected = bytes_expected,
                                 num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
+                                replication_desired = args.replication,
                                 name = collection_name,
                                 owner_uuid = project_uuid,
                                 ensure_unique_name = True)
@@ -773,9 +765,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
-    else:
-        writer.destroy_cache()
 
+    # Success!
+    writer.destroy_cache()
     return output