10968: Added a notification when uploading at least one directory, just to let know...
[arvados.git] / sdk / python / arvados / commands / put.py
index 714281cc95b0475831f1761470c9cf1b5e91cce5..9590f9d995808f89f0ed8df11e8e64891f4a466b 100644 (file)
@@ -133,6 +133,15 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
+                         help="""
+Set the number of upload threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -347,8 +356,9 @@ class ArvPutUploadJob(object):
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                  bytes_expected=None, name=None, owner_uuid=None,
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                  bytes_expected=None, name=None, owner_uuid=None,
-                 ensure_unique_name=False, num_retries=None, replication_desired=None,
-                 filename=None, update_time=20.0, update_collection=None,
+                 ensure_unique_name=False, num_retries=None,
+                 put_threads=None, replication_desired=None,
+                 filename=None, update_time=60.0, update_collection=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
         self.paths = paths
         self.resume = resume
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
         self.paths = paths
         self.resume = resume
@@ -363,6 +373,7 @@ class ArvPutUploadJob(object):
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
         self.replication_desired = replication_desired
+        self.put_threads = put_threads
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -371,12 +382,13 @@ class ArvPutUploadJob(object):
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
 
         self.logger = logger
         self.dry_run = dry_run
 
@@ -433,6 +445,7 @@ class ArvPutUploadJob(object):
             # report initial progress.
             self._update()
             # Actual file upload
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
         finally:
             if not self.dry_run:
             self._upload_files()
         finally:
             if not self.dry_run:
@@ -495,24 +508,31 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
-            if self.use_cache:
-                # Update cache
-                with self._state_lock:
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
                     if final:
                     if final:
-                        self._state['manifest'] = self._local_collection.manifest_text()
+                        manifest = self._local_collection.manifest_text()
                     else:
                         # Get the manifest text without comitting pending blocks
                     else:
                         # Get the manifest text without comitting pending blocks
-                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
+            if self.use_cache:
                 self._save_state()
                 self._save_state()
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -531,7 +551,7 @@ class ArvPutUploadJob(object):
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -663,7 +683,7 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
@@ -688,12 +708,15 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
         """
         try:
             with self._state_lock:
-                state = copy.deepcopy(self._state)
+                # We're not using copy.deepcopy() here because it's a lot slower
+                # than json.dumps(), and we're already needing JSON format to be
+                # saved on disk.
+                state = json.dumps(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
             new_cache.flush()
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
@@ -801,6 +824,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -840,6 +864,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment. 
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
@@ -851,6 +879,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
+                                 put_threads = args.threads,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,