9463: Unify replication_desired & write_copies parameters to only one, passing it...
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 29 Jul 2016 13:08:36 +0000 (10:08 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 29 Jul 2016 13:08:36 +0000 (10:08 -0300)
sdk/python/arvados/commands/put.py

index f0767738728d4e6af1de0fdcb3ffd3ddaabdd005..b2c40f12f8c361ae988dde139c2b26b830adb372 100644 (file)
@@ -289,7 +289,7 @@ class ArvPutUploadJob(object):
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, write_copies=None, replication=None,
+                 num_retries=None, replication_desired=None,
                  filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
@@ -301,8 +301,7 @@ class ArvPutUploadJob(object):
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -345,17 +344,17 @@ class ArvPutUploadJob(object):
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -440,7 +439,8 @@ class ArvPutUploadJob(object):
                         'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
@@ -488,12 +488,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -688,19 +688,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -715,8 +702,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                 reporter = reporter,
                                 bytes_expected = bytes_expected,
                                 num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
+                                replication_desired = args.replication,
                                 name = collection_name,
                                 owner_uuid = project_uuid,
                                 ensure_unique_name = True)
@@ -776,9 +762,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
-    else:
-        writer.destroy_cache()
 
+    # Success!
+    writer.destroy_cache()
     return output