11308: Futurize.
[arvados.git] / sdk / python / arvados / commands / put.py
index 5b46ba75b70d864589f681f1619500def41781d5..32d5fef6a8588e1f2785517435949082dd5b3534 100644 (file)
@@ -23,6 +23,8 @@ import sys
 import tempfile
 import threading
 import time
 import tempfile
 import threading
 import time
+import traceback
+
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
@@ -382,14 +384,16 @@ class ArvPutUploadJob(object):
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
         self._collection_lock = threading.Lock()
         self._remote_collection = None # Collection being updated (if asked)
         self._local_collection = None # Collection from previous run manifest
-        self._file_paths = [] # Files to be updated in remote collection
+        self._file_paths = set() # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
         self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self._upload_started = False
         self.logger = logger
         self.dry_run = dry_run
         self.logger = logger
         self.dry_run = dry_run
+        self._checkpoint_before_quit = True
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -444,17 +448,28 @@ class ArvPutUploadJob(object):
             # report initial progress.
             self._update()
             # Actual file upload
             # report initial progress.
             self._update()
             # Actual file upload
+            self._upload_started = True # Used by the update thread to start checkpointing
             self._upload_files()
             self._upload_files()
+        except (SystemExit, Exception) as e:
+            self._checkpoint_before_quit = False
+            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
+            # Note: We're expecting SystemExit instead of KeyboardInterrupt because
+            #   we have a custom signal handler in place that raises SystemExit with
+            #   the catched signal's code.
+            if not isinstance(e, SystemExit) or e.code != -2:
+                self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
+            raise
         finally:
             if not self.dry_run:
                 # Stop the thread before doing anything else
                 self._stop_checkpointer.set()
                 self._checkpointer.join()
         finally:
             if not self.dry_run:
                 # Stop the thread before doing anything else
                 self._stop_checkpointer.set()
                 self._checkpointer.join()
-                # Commit all pending blocks & one last _update()
-                self._local_collection.manifest_text()
-                self._update(final=True)
-                if save_collection:
-                    self.save_collection()
+                if self._checkpoint_before_quit:
+                    # Commit all pending blocks & one last _update()
+                    self._local_collection.manifest_text()
+                    self._update(final=True)
+                    if save_collection:
+                        self.save_collection()
             if self.use_cache:
                 self._cache_file.close()
 
             if self.use_cache:
                 self._cache_file.close()
 
@@ -506,28 +521,31 @@ class ArvPutUploadJob(object):
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
         Periodically called support task. File uploading is
         asynchronous so we poll status from the collection.
         """
-        while not self._stop_checkpointer.wait(self._update_task_time):
+        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
             self._update()
 
     def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
+        if self._upload_started:
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._local_collection)
+                if self.use_cache:
+                    if final:
+                        manifest = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        manifest = self._local_collection.manifest_text(strip=False,
+                                                                        normalize=False,
+                                                                        only_committed=True)
+                    # Update cache
+                    with self._state_lock:
+                        self._state['manifest'] = manifest
             if self.use_cache:
             if self.use_cache:
-                if final:
-                    manifest = self._local_collection.manifest_text()
-                else:
-                    # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(strip=False,
-                                                                    normalize=False,
-                                                                    only_committed=True)
-                # Update cache
-                with self._state_lock:
-                    self._state['manifest'] = manifest
-        if self.use_cache:
-            self._save_state()
+                self._save_state()
+        else:
+            self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
         self.report_progress()
 
         # Call the reporter, if any
         self.report_progress()
 
@@ -546,7 +564,7 @@ class ArvPutUploadJob(object):
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
         should_upload = False
         new_file_in_cache = False
         # Record file path for updating the remote collection before exiting
-        self._file_paths.append(filename)
+        self._file_paths.add(filename)
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
 
         with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
@@ -703,12 +721,15 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
         """
         try:
             with self._state_lock:
-                state = copy.deepcopy(self._state)
+                # We're not using copy.deepcopy() here because it's a lot slower
+                # than json.dumps(), and we're already needing JSON format to be
+                # saved on disk.
+                state = json.dumps(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
+            new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
             new_cache.flush()
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
@@ -729,7 +750,14 @@ class ArvPutUploadJob(object):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
         return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        return self._my_collection().portable_data_hash()
+        pdh = self._my_collection().portable_data_hash()
+        m = self._my_collection().stripped_manifest()
+        local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
+        if pdh != local_pdh:
+            logger.warning("\n".join([
+                "arv-put: API server provided PDH differs from local manifest.",
+                "         This should not happen; showing API server version."]))
+        return pdh
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self._my_collection().manifest_text(stream_name, strip, normalize)
@@ -816,6 +844,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
     global api_client
 
     logger = logging.getLogger('arvados.arv_put')
+    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -855,6 +884,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         reporter = None
 
     else:
         reporter = None
 
+    # If this is used by a human, and there's at least one directory to be
+    # uploaded, the expected bytes calculation can take a moment.
+    if args.progress and any([os.path.isdir(f) for f in args.paths]):
+        logger.info("Calculating upload size, this could take some time...")
     bytes_expected = expected_bytes_for(args.paths)
 
     try:
     bytes_expected = expected_bytes_for(args.paths)
 
     try: