13108: Don't try to install signal handler in background thread
[arvados.git] / sdk / python / arvados / commands / put.py
index 68f63b1c261bcfcfde9dd6b42cbe6ca8a67e15ab..af8e243b849f10458cfe0ff058c9890649e53888 100644 (file)
@@ -193,6 +193,12 @@ Display machine-readable progress on stderr (bytes and, if known,
 total data size).
 """)
 
+run_opts.add_argument('--silent', action='store_true',
+                      help="""
+Do not print any debug messages to console. (Any error messages will
+still be displayed.)
+""")
+
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--resume', action='store_true', default=True,
                     help="""
@@ -243,7 +249,7 @@ def parse_arguments(arguments):
     """)
 
     # Turn on --progress by default if stderr is a tty.
-    if (not (args.batch_progress or args.no_progress)
+    if (not (args.batch_progress or args.no_progress or args.silent)
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
@@ -303,6 +309,24 @@ class FileUploadList(list):
         super(FileUploadList, self).append(other)
 
 
+# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
+class ArvPutLogFormatter(logging.Formatter):
+    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
+    err_fmtr = None
+    request_id_informed = False
+
+    def __init__(self, request_id):
+        self.err_fmtr = logging.Formatter(
+            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
+            arvados.log_date_format)
+
+    def format(self, record):
+        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
+            self.request_id_informed = True
+            return self.err_fmtr.format(record)
+        return self.std_fmtr.format(record)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -393,7 +417,7 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 name=None, owner_uuid=None,
+                 name=None, owner_uuid=None, api_client=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
@@ -416,6 +440,7 @@ class ArvPutUploadJob(object):
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
+        self._api_client = api_client
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
@@ -705,6 +730,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
+                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
@@ -769,7 +795,8 @@ class ArvPutUploadJob(object):
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
-                self._remote_collection = arvados.collection.Collection(update_collection)
+                self._remote_collection = arvados.collection.Collection(
+                    update_collection, api_client=self._api_client)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -816,7 +843,11 @@ class ArvPutUploadJob(object):
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
-            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+            self._local_collection = arvados.collection.Collection(
+                self._state['manifest'],
+                replication_desired=self.replication_desired,
+                put_threads=self.put_threads,
+                api_client=self._api_client)
 
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
@@ -874,7 +905,7 @@ class ArvPutUploadJob(object):
         m = self._my_collection().stripped_manifest().encode()
         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
         if pdh != local_pdh:
-            logger.warning("\n".join([
+            self.logger.warning("\n".join([
                 "arv-put: API server provided PDH differs from local manifest.",
                 "         This should not happen; showing API server version."]))
         return pdh
@@ -948,6 +979,7 @@ def progress_writer(progress_func, outfile=sys.stderr):
     return write_progress
 
 def exit_signal_handler(sigcode, frame):
+    logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
     sys.exit(-sigcode)
 
 def desired_project_uuid(api_client, project_uuid, num_retries):
@@ -961,15 +993,32 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+         install_sig_handlers=True):
     global api_client
 
-    logger = logging.getLogger('arvados.arv_put')
-    logger.setLevel(logging.INFO)
     args = parse_arguments(arguments)
+    logger = logging.getLogger('arvados.arv_put')
+    if args.silent:
+        logger.setLevel(logging.WARNING)
+    else:
+        logger.setLevel(logging.INFO)
     status = 0
+
+    request_id = arvados.util.new_request_id()
+
+    formatter = ArvPutLogFormatter(request_id)
+    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
+
     if api_client is None:
-        api_client = arvados.api('v1')
+        api_client = arvados.api('v1', request_id=request_id)
+
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {}
+    if install_sig_handlers:
+        orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+                                for sigcode in CAUGHT_SIGNALS}
 
     # Determine the name to use
     if args.name:
@@ -1054,6 +1103,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
+                                 api_client = api_client,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -1086,11 +1136,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             "arv-put: %s" % str(error)]))
         sys.exit(1)
 
-    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
-    # the originals.
-    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
-                            for sigcode in CAUGHT_SIGNALS}
-
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
             "arv-put: Resuming previous upload from last checkpoint.",
@@ -1135,7 +1180,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # Print the locator (uuid) of the new collection.
     if output is None:
         status = status or 1
-    else:
+    elif not args.silent:
         stdout.write(output)
         if not output.endswith('\n'):
             stdout.write('\n')