10315: Bring back arv-put command from 9701's branch.
[arvados.git] / sdk / python / arvados / commands / put.py
index f0767738728d4e6af1de0fdcb3ffd3ddaabdd005..89753a22863808c3fb89686cbf9948dae8760171 100644 (file)
@@ -289,7 +289,7 @@ class ArvPutUploadJob(object):
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, write_copies=None, replication=None,
+                 num_retries=None, replication_desired=None,
                  filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
@@ -301,8 +301,7 @@ class ArvPutUploadJob(object):
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -321,6 +320,7 @@ class ArvPutUploadJob(object):
         """
         Start supporting thread & file uploading
         """
+        self._checkpointer.daemon = True
         self._checkpointer.start()
         try:
             for path in self.paths:
@@ -335,27 +335,28 @@ class ArvPutUploadJob(object):
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-        # Successful upload, one last _update()
-        self._update()
-        if self.resume:
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -388,7 +389,8 @@ class ArvPutUploadJob(object):
             # Update cache, if resume enabled
             if self.resume:
                 with self._state_lock:
-                    self._state['manifest'] = self._my_collection().manifest_text()
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
         if self.resume:
             self._save_state()
         # Call the reporter, if any
@@ -440,7 +442,8 @@ class ArvPutUploadJob(object):
                         'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
@@ -472,10 +475,14 @@ class ArvPutUploadJob(object):
             output.close()
 
     def _write(self, source_fd, output):
+        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            if not data:
+            # Allow an empty file to be written
+            if not data and not first_read:
                 break
+            if first_read:
+                first_read = False
             output.write(data)
 
     def _my_collection(self):
@@ -488,12 +495,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -588,11 +595,15 @@ class ArvPutUploadJob(object):
         through subcollections
         """
         if isinstance(item, arvados.arvfile.ArvadosFile):
-            locators = []
-            for segment in item.segments():
-                loc = segment.locator
-                locators.append(loc)
-            return locators
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
         elif isinstance(item, arvados.collection.Collection):
             l = [self._datablocks_on_item(x) for x in item.values()]
             # Fast list flattener method taken from:
@@ -688,19 +699,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -711,15 +709,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     bytes_expected = expected_bytes_for(args.paths)
     try:
         writer = ArvPutUploadJob(paths = args.paths,
-                                resume = args.resume,
-                                reporter = reporter,
-                                bytes_expected = bytes_expected,
-                                num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
-                                name = collection_name,
-                                owner_uuid = project_uuid,
-                                ensure_unique_name = True)
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -776,9 +774,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
-    else:
-        writer.destroy_cache()
 
+    # Success!
+    writer.destroy_cache()
     return output