-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
- global api_client
-
- args = parse_arguments(arguments)
- status = 0
- if api_client is None:
- api_client = arvados.api('v1')
-
- # Determine the name to use
- if args.name:
- if args.stream or args.raw:
- print >>stderr, "Cannot use --name with --stream or --raw"
- sys.exit(1)
- collection_name = args.name
- else:
- collection_name = "Saved at {} by {}@{}".format(
- datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
- pwd.getpwuid(os.getuid()).pw_name,
- socket.gethostname())
-
- if args.project_uuid and (args.stream or args.raw):
- print >>stderr, "Cannot use --project-uuid with --stream or --raw"
- sys.exit(1)
-
- # Determine the parent project
- try:
- project_uuid = desired_project_uuid(api_client, args.project_uuid,
- args.retries)
- except (apiclient_errors.Error, ValueError) as error:
- print >>stderr, error
- sys.exit(1)
-
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
- if args.progress:
- reporter = progress_writer(human_progress)
- elif args.batch_progress:
- reporter = progress_writer(machine_progress)
- else:
- reporter = None
- bytes_expected = expected_bytes_for(args.paths)
-
- resume_cache = None
- if args.resume:
- try:
- resume_cache = ResumeCache(ResumeCache.make_path(args))
- resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
- except (IOError, OSError, ValueError):
- pass # Couldn't open cache directory/file. Continue without it.
- except ResumeCacheConflict:
- print >>stderr, "\n".join([
- "arv-put: Another process is already uploading this data.",
- " Use --no-resume if this is really what you want."])
- sys.exit(1)
-
- if resume_cache is None:
- writer = ArvPutCollectionWriter(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
- else:
- writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
-
- # Install our signal handler for each code in CAUGHT_SIGNALS, and save
- # the originals.
- orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
- for sigcode in CAUGHT_SIGNALS}
-
- if writer.bytes_written > 0: # We're resuming a previous upload.
- print >>stderr, "\n".join([
- "arv-put: Resuming previous upload from last checkpoint.",
- " Use the --no-resume option to start over."])
-
- writer.report_progress()
- writer.do_queued_work() # Do work resumed from cache.
- for path in args.paths: # Copy file data to Keep.
- if path == '-':
- writer.start_new_stream()
- writer.start_new_file(args.filename)
- r = sys.stdin.read(64*1024)
- while r:
- # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
- # CollectionWriter.write().
- super(arvados.collection.ResumableCollectionWriter, writer).write(r)
- r = sys.stdin.read(64*1024)
- elif os.path.isdir(path):
- writer.write_directory_tree(
- path, max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.write_file(path, args.filename or os.path.basename(path))
- writer.finish_current_stream()
-
- if args.progress: # Print newline to split stderr from stdout for humans.
- print >>stderr
-
- output = None
- if args.stream:
- output = writer.manifest_text()
- if args.normalize:
- output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
- elif args.raw:
- output = ','.join(writer.data_locators())
- else:
- try:
- manifest_text = writer.manifest_text()
- if args.normalize:
- manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
- replication_attr = 'replication_desired'
- if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # API called it 'redundancy' before #3410.
- replication_attr = 'redundancy'
- # Register the resulting collection in Arvados.
- collection = api_client.collections().create(
- body={
- 'owner_uuid': project_uuid,
- 'name': collection_name,
- 'manifest_text': manifest_text,
- replication_attr: args.replication,
- },
- ensure_unique_name=True
- ).execute(num_retries=args.retries)
-
- print >>stderr, "Collection saved as '%s'" % collection['name']
-
- if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
- output = collection['portable_data_hash']
- else:
- output = collection['uuid']
-
- except apiclient_errors.Error as error:
- print >>stderr, (
- "arv-put: Error creating Collection on project: {}.".format(
- error))
- status = 1
-
- # Print the locator (uuid) of the new collection.
- if output is None:
- status = status or 1
- else:
- stdout.write(output)
- if not output.endswith('\n'):
- stdout.write('\n')
-
- for sigcode, orig_handler in orig_signal_handlers.items():
- signal.signal(sigcode, orig_handler)
-
- if status != 0:
- sys.exit(status)
-
- if resume_cache is not None:
- resume_cache.destroy()
-
- return output