13108: Don't try to install signal handler in background thread
[arvados.git] / sdk / python / arvados / commands / put.py
index b357bc94b8441380fffae5d41a96fa95c6bf5600..af8e243b849f10458cfe0ff058c9890649e53888 100644 (file)
@@ -193,9 +193,10 @@ Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
-_group.add_argument('--silent', action='store_true',
-                    help="""
-Do not produce any output unless an error happens.
+run_opts.add_argument('--silent', action='store_true',
+                      help="""
+Do not print any debug messages to console. (Any error messages will
+still be displayed.)
 """)
 
 _group = run_opts.add_mutually_exclusive_group()
@@ -308,6 +309,24 @@ class FileUploadList(list):
         super(FileUploadList, self).append(other)
 
 
+# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
+class ArvPutLogFormatter(logging.Formatter):
+    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
+    err_fmtr = None
+    request_id_informed = False
+
+    def __init__(self, request_id):
+        self.err_fmtr = logging.Formatter(
+            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
+            arvados.log_date_format)
+
+    def format(self, record):
+        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
+            self.request_id_informed = True
+            return self.err_fmtr.format(record)
+        return self.std_fmtr.format(record)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -398,7 +417,7 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None,
+                 name=None, owner_uuid=None, api_client=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
@@ -421,6 +440,7 @@ class ArvPutUploadJob(object):
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
+        self._api_client = api_client
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
@@ -710,6 +730,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
+                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
@@ -774,7 +795,8 @@ class ArvPutUploadJob(object):
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
-                self._remote_collection = arvados.collection.Collection(update_collection)
+                self._remote_collection = arvados.collection.Collection(
+                    update_collection, api_client=self._api_client)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -821,7 +843,11 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+            self._local_collection = arvados.collection.Collection(
+                self._state['manifest'],
+                replication_desired=self.replication_desired,
+                put_threads=self.put_threads,
+                api_client=self._api_client)
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
@@ -879,7 +905,7 @@ class ArvPutUploadJob(object):
         m = self._my_collection().stripped_manifest().encode()
         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
         if pdh != local_pdh:
-            logger.warning("\n".join([
+            self.logger.warning("\n".join([
                 "arv-put: API server provided PDH differs from local manifest.",
                 "         This should not happen; showing API server version."]))
         return pdh
@@ -953,6 +979,7 @@ def progress_writer(progress_func, outfile=sys.stderr):
     return write_progress
 
 def exit_signal_handler(sigcode, frame):
+    logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
     sys.exit(-sigcode)
 
 def desired_project_uuid(api_client, project_uuid, num_retries):
@@ -966,7 +993,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+         install_sig_handlers=True):
     global api_client
 
     args = parse_arguments(arguments)
@@ -976,8 +1004,21 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         logger.setLevel(logging.INFO)
     status = 0
+
+    request_id = arvados.util.new_request_id()
+
+    formatter = ArvPutLogFormatter(request_id)
+    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
+
     if api_client is None:
-        api_client = arvados.api('v1')
+        api_client = arvados.api('v1', request_id=request_id)
+
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {}
+    if install_sig_handlers:
+        orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+                                for sigcode in CAUGHT_SIGNALS}
 
     # Determine the name to use
     if args.name:
@@ -1062,6 +1103,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
+                                 api_client = api_client,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -1094,11 +1136,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             "arv-put: %s" % str(error)]))
         sys.exit(1)
 
-    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
-    # the originals.
-    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
-                            for sigcode in CAUGHT_SIGNALS}
-
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
             "arv-put: Resuming previous upload from last checkpoint.",