import arvados.commands._util as arv_cmd
-CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
api_client = None
upload_opts = argparse.ArgumentParser(add_help=False)
block. Default is to use the server-provided default (if any) or 2.
""")
+upload_opts.add_argument('--storage-classes', help="""
+Specify comma separated list of storage classes to be used when saving data to Keep.
+""")
+
upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
help="""
Set the number of upload threads to be used. Take into account that
total data size).
""")
-_group.add_argument('--silent', action='store_true',
- help="""
-Don't produce any output unless an error happens.
+run_opts.add_argument('--silent', action='store_true',
+ help="""
+Do not print any debug messages to console. (Any error messages will
+still be displayed.)
""")
_group = run_opts.add_mutually_exclusive_group()
super(FileUploadList, self).append(other)
+# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
+class ArvPutLogFormatter(logging.Formatter):
+ std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
+ err_fmtr = None
+ request_id_informed = False
+
+ def __init__(self, request_id):
+ self.err_fmtr = logging.Formatter(
+ arvados.log_format+' (X-Request-Id: {})'.format(request_id),
+ arvados.log_date_format)
+
+ def format(self, record):
+ if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
+ self.request_id_informed = True
+ return self.err_fmtr.format(record)
+ return self.std_fmtr.format(record)
+
+
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
}
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
- name=None, owner_uuid=None,
+ name=None, owner_uuid=None, api_client=None,
ensure_unique_name=False, num_retries=None,
- put_threads=None, replication_desired=None,
- filename=None, update_time=60.0, update_collection=None,
+ put_threads=None, replication_desired=None, filename=None,
+ update_time=60.0, update_collection=None, storage_classes=None,
logger=logging.getLogger('arvados.arv_put'), dry_run=False,
follow_links=True, exclude_paths=[], exclude_names=None):
self.paths = paths
self.replication_desired = replication_desired
self.put_threads = put_threads
self.filename = filename
+ self.storage_classes = storage_classes
+ self._api_client = api_client
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
self._current_files = [] # Current run file list
else:
# The file already exist on remote collection, skip it.
pass
- self._remote_collection.save(num_retries=self.num_retries)
+ self._remote_collection.save(storage_classes=self.storage_classes,
+ num_retries=self.num_retries)
else:
+ if self.storage_classes is None:
+ self.storage_classes = ['default']
self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
+ storage_classes=self.storage_classes,
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries)
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
+ self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
if update_collection and re.match(arvados.util.collection_uuid_pattern,
update_collection):
try:
- self._remote_collection = arvados.collection.Collection(update_collection)
+ self._remote_collection = arvados.collection.Collection(
+ update_collection, api_client=self._api_client)
except arvados.errors.ApiError as error:
raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
else:
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
- self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
+ self._local_collection = arvados.collection.Collection(
+ self._state['manifest'],
+ replication_desired=self.replication_desired,
+ put_threads=self.put_threads,
+ api_client=self._api_client)
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
m = self._my_collection().stripped_manifest().encode()
local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
if pdh != local_pdh:
- logger.warning("\n".join([
+ self.logger.warning("\n".join([
"arv-put: API server provided PDH differs from local manifest.",
" This should not happen; showing API server version."]))
return pdh
outfile.write(progress_func(bytes_written, bytes_expected))
return write_progress
-def exit_signal_handler(sigcode, frame):
- sys.exit(-sigcode)
-
def desired_project_uuid(api_client, project_uuid, num_retries):
if not project_uuid:
query = api_client.users().current()
raise ValueError("Not a valid project UUID: {}".format(project_uuid))
return query.execute(num_retries=num_retries)['uuid']
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+ install_sig_handlers=True):
global api_client
args = parse_arguments(arguments)
else:
logger.setLevel(logging.INFO)
status = 0
+
+ request_id = arvados.util.new_request_id()
+
+ formatter = ArvPutLogFormatter(request_id)
+ logging.getLogger('arvados').handlers[0].setFormatter(formatter)
+
if api_client is None:
- api_client = arvados.api('v1')
+ api_client = arvados.api('v1', request_id=request_id)
+
+ if install_sig_handlers:
+ arv_cmd.install_signal_handlers()
# Determine the name to use
if args.name:
else:
reporter = None
+ # Split storage-classes argument
+ storage_classes = None
+ if args.storage_classes:
+ storage_classes = args.storage_classes.strip().split(',')
+ if len(storage_classes) > 1:
+ logger.error("Multiple storage classes are not supported currently.")
+ sys.exit(1)
+
+
# Setup exclude regex from all the --exclude arguments provided
name_patterns = []
exclude_paths = []
use_cache = args.use_cache,
filename = args.filename,
reporter = reporter,
+ api_client = api_client,
num_retries = args.retries,
replication_desired = args.replication,
put_threads = args.threads,
owner_uuid = project_uuid,
ensure_unique_name = True,
update_collection = args.update_collection,
+ storage_classes=storage_classes,
logger=logger,
dry_run=args.dry_run,
follow_links=args.follow_links,
"arv-put: %s" % str(error)]))
sys.exit(1)
- # Install our signal handler for each code in CAUGHT_SIGNALS, and save
- # the originals.
- orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
- for sigcode in CAUGHT_SIGNALS}
-
if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
logger.warning("\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
if not output.endswith('\n'):
stdout.write('\n')
- for sigcode, orig_handler in listitems(orig_signal_handlers):
- signal.signal(sigcode, orig_handler)
+ if install_sig_handlers:
+ arv_cmd.restore_signal_handlers()
if status != 0:
sys.exit(status)