10968: Added a notification when uploading at least one directory, just to let know...
[arvados.git] / sdk / python / arvados / commands / put.py
index 2fbac22916acbacb6dad16e6005114ac09482782..9590f9d995808f89f0ed8df11e8e64891f4a466b 100644 (file)
@@ -382,12 +382,13 @@ class ArvPutUploadJob(object):
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
 
         self.logger = logger
         self.dry_run = dry_run
 
@@ -436,16 +437,15 @@ class ArvPutUploadJob(object):
                 raise ArvPutUploadNotPending()
             # Remove local_collection's files that don't exist locally anymore, so the
             # bytes_written count is correct.
                 raise ArvPutUploadNotPending()
             # Remove local_collection's files that don't exist locally anymore, so the
             # bytes_written count is correct.
-            # Using a set because is lot faster than a list in this case
-            file_paths = set(self._file_paths)
             for f in self.collection_file_paths(self._local_collection,
                                                 path_prefix=""):
             for f in self.collection_file_paths(self._local_collection,
                                                 path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in file_paths:
+                if f != 'stdin' and f != self.filename and not f in self._file_paths:
                     self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
                     self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
         finally:
             if not self.dry_run:
             self._upload_files()
         finally:
             if not self.dry_run:
@@ -508,28 +508,31 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
             if self.use_cache:
             if self.use_cache:
-                if final:
-                    manifest = self._local_collection.manifest_text()
-                else:
-                    # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(strip=False,
-                                                                    normalize=False,
-                                                                    only_committed=True)
-                # Update cache
-                with self._state_lock:
-                    self._state['manifest'] = manifest
-        if self.use_cache:
-            self._save_state()
+                self._save_state()
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -548,7 +551,7 @@ class ArvPutUploadJob(object):
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -705,6 +708,9 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
         """
         try:
             with self._state_lock:
+                # We're not using copy.deepcopy() here because it's a lot slower
+                # than json.dumps(), and we're already needing JSON format to be
+                # saved on disk.
                 state = json.dumps(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
                 state = json.dumps(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
@@ -818,6 +824,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -857,6 +864,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment. 
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
     bytes_expected = expected_bytes_for(args.paths)
 
     try: