Merge branch 'master' into 10979-cancelled-job-nodes
[arvados.git] / sdk / python / arvados / commands / put.py
index 24cb987e5316afd47390b39ebe00dad407fcf15f..c26bb04f3200778bb1b06d033ae958ece01ba731 100644 (file)
@@ -388,6 +388,7 @@ class ArvPutUploadJob(object):
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
 
@@ -444,6 +445,7 @@ class ArvPutUploadJob(object):
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
         finally:
             if not self.dry_run:
@@ -506,28 +508,31 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
             if self.use_cache:
-                if final:
-                    manifest = self._local_collection.manifest_text()
-                else:
-                    # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(strip=False,
-                                                                    normalize=False,
-                                                                    only_committed=True)
-                # Update cache
-                with self._state_lock:
-                    self._state['manifest'] = manifest
-        if self.use_cache:
-            self._save_state()
+                self._save_state()
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
@@ -732,7 +737,14 @@ class ArvPutUploadJob(object):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        return self._my_collection().portable_data_hash()
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest()
+        local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
@@ -819,6 +831,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -858,6 +871,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     bytes_expected = expected_bytes_for(args.paths)
 
     try: