4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
29 import arvados.commands._util as arv_cmd
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 upload_opts = argparse.ArgumentParser(add_help=False)
36 upload_opts.add_argument('--version', action='version',
37 version="%s %s" % (sys.argv[0], __version__),
38 help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41 Local file or directory. Default: read from standard input.
44 _group = upload_opts.add_mutually_exclusive_group()
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47 default=-1, help=argparse.SUPPRESS)
49 _group.add_argument('--normalize', action='store_true',
51 Normalize the manifest by re-ordering files and streams after writing
55 _group.add_argument('--dry-run', action='store_true', default=False,
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
61 _group = upload_opts.add_mutually_exclusive_group()
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
68 _group.add_argument('--stream', action='store_true',
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77 Synonym for --manifest.
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82 Synonym for --manifest.
85 _group.add_argument('--manifest', action='store_true',
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
97 _group.add_argument('--raw', action='store_true',
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105 dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111 dest='filename', help="""
112 Synonym for --filename.
115 upload_opts.add_argument('--filename', type=str, default=None,
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
136 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138 Set the number of upload threads to be used. Take into account that
139 using lots of threads will increase the RAM requirements. Default is
141 On high latency installations, using a greater number will improve
145 run_opts = argparse.ArgumentParser(add_help=False)
147 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148 Store the collection in the specified project, instead of your Home
152 run_opts.add_argument('--name', help="""
153 Save the collection with the specified name.
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--progress', action='store_true',
159 Display human-readable progress on stderr (bytes and, if possible,
160 percentage of total data size). This is the default behavior when
164 _group.add_argument('--no-progress', action='store_true',
166 Do not display human-readable progress on stderr, even if stderr is a
170 _group.add_argument('--batch-progress', action='store_true',
172 Display machine-readable progress on stderr (bytes and, if known,
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--resume', action='store_true', default=True,
179 Continue interrupted uploads from cached state (default).
181 _group.add_argument('--no-resume', action='store_false', dest='resume',
183 Do not continue interrupted uploads from cached state.
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
189 Save upload state in a cache file for resuming (default).
191 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
193 Do not save upload state in a cache file for resuming.
196 arg_parser = argparse.ArgumentParser(
197 description='Copy data from the local filesystem to Keep.',
198 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
200 def parse_arguments(arguments):
201 args = arg_parser.parse_args(arguments)
203 if len(args.paths) == 0:
206 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
208 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
211 --filename argument cannot be used when storing a directory or
215 # Turn on --progress by default if stderr is a tty.
216 if (not (args.batch_progress or args.no_progress)
217 and os.isatty(sys.stderr.fileno())):
220 # Turn off --resume (default) if --no-cache is used.
221 if not args.use_cache:
224 if args.paths == ['-']:
225 if args.update_collection:
227 --update-collection cannot be used when reading from stdin.
230 args.use_cache = False
231 if not args.filename:
232 args.filename = 'stdin'
237 class CollectionUpdateError(Exception):
241 class ResumeCacheConflict(Exception):
245 class ArvPutArgumentConflict(Exception):
249 class ArvPutUploadIsPending(Exception):
253 class ArvPutUploadNotPending(Exception):
257 class FileUploadList(list):
258 def __init__(self, dry_run=False):
260 self.dry_run = dry_run
262 def append(self, other):
264 raise ArvPutUploadIsPending()
265 super(FileUploadList, self).append(other)
268 class ResumeCache(object):
269 CACHE_DIR = '.cache/arvados/arv-put'
271 def __init__(self, file_spec):
272 self.cache_file = open(file_spec, 'a+')
273 self._lock_file(self.cache_file)
274 self.filename = self.cache_file.name
277 def make_path(cls, args):
279 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280 realpaths = sorted(os.path.realpath(path) for path in args.paths)
281 md5.update('\0'.join(realpaths))
282 if any(os.path.isdir(path) for path in realpaths):
285 md5.update(args.filename)
287 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
290 def _lock_file(self, fileobj):
292 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
294 raise ResumeCacheConflict("{} locked".format(fileobj.name))
297 self.cache_file.seek(0)
298 return json.load(self.cache_file)
300 def check_cache(self, api_client=None, num_retries=0):
305 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306 locator = state["_finished_streams"][0][1][0]
307 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308 locator = state["_current_stream_locators"][0]
309 if locator is not None:
310 kc = arvados.keep.KeepClient(api_client=api_client)
311 kc.head(locator, num_retries=num_retries)
312 except Exception as e:
317 def save(self, data):
319 new_cache_fd, new_cache_name = tempfile.mkstemp(
320 dir=os.path.dirname(self.filename))
321 self._lock_file(new_cache_fd)
322 new_cache = os.fdopen(new_cache_fd, 'r+')
323 json.dump(data, new_cache)
324 os.rename(new_cache_name, self.filename)
325 except (IOError, OSError, ResumeCacheConflict) as error:
327 os.unlink(new_cache_name)
328 except NameError: # mkstemp failed.
331 self.cache_file.close()
332 self.cache_file = new_cache
335 self.cache_file.close()
339 os.unlink(self.filename)
340 except OSError as error:
341 if error.errno != errno.ENOENT: # That's what we wanted anyway.
347 self.__init__(self.filename)
350 class ArvPutUploadJob(object):
351 CACHE_DIR = '.cache/arvados/arv-put'
353 'manifest' : None, # Last saved manifest checkpoint
354 'files' : {} # Previous run file list: {path : {size, mtime}}
357 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358 bytes_expected=None, name=None, owner_uuid=None,
359 ensure_unique_name=False, num_retries=None,
360 put_threads=None, replication_desired=None,
361 filename=None, update_time=60.0, update_collection=None,
362 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
365 self.use_cache = use_cache
367 self.reporter = reporter
368 self.bytes_expected = bytes_expected
369 self.bytes_written = 0
370 self.bytes_skipped = 0
372 self.owner_uuid = owner_uuid
373 self.ensure_unique_name = ensure_unique_name
374 self.num_retries = num_retries
375 self.replication_desired = replication_desired
376 self.put_threads = put_threads
377 self.filename = filename
378 self._state_lock = threading.Lock()
379 self._state = None # Previous run state (file list & manifest)
380 self._current_files = [] # Current run file list
381 self._cache_file = None
382 self._collection_lock = threading.Lock()
383 self._remote_collection = None # Collection being updated (if asked)
384 self._local_collection = None # Collection from previous run manifest
385 self._file_paths = set() # Files to be updated in remote collection
386 self._stop_checkpointer = threading.Event()
387 self._checkpointer = threading.Thread(target=self._update_task)
388 self._checkpointer.daemon = True
389 self._update_task_time = update_time # How many seconds wait between update runs
390 self._files_to_upload = FileUploadList(dry_run=dry_run)
391 self._upload_started = False
393 self.dry_run = dry_run
395 if not self.use_cache and self.resume:
396 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
398 # Check for obvious dry-run responses
399 if self.dry_run and (not self.use_cache or not self.resume):
400 raise ArvPutUploadIsPending()
402 # Load cached data if any and if needed
403 self._setup_state(update_collection)
405 def start(self, save_collection):
407 Start supporting thread & file uploading
410 self._checkpointer.start()
412 for path in self.paths:
413 # Test for stdin first, in case some file named '-' exist
416 raise ArvPutUploadIsPending()
417 self._write_stdin(self.filename or 'stdin')
418 elif os.path.isdir(path):
419 # Use absolute paths on cache index so CWD doesn't interfere
420 # with the caching logic.
421 prefixdir = path = os.path.abspath(path)
424 for root, dirs, files in os.walk(path):
425 # Make os.walk()'s dir traversing order deterministic
429 self._check_file(os.path.join(root, f),
430 os.path.join(root[len(prefixdir):], f))
432 self._check_file(os.path.abspath(path),
433 self.filename or os.path.basename(path))
434 # If dry-mode is on, and got up to this point, then we should notify that
435 # there aren't any file to upload.
437 raise ArvPutUploadNotPending()
438 # Remove local_collection's files that don't exist locally anymore, so the
439 # bytes_written count is correct.
440 for f in self.collection_file_paths(self._local_collection,
442 if f != 'stdin' and f != self.filename and not f in self._file_paths:
443 self._local_collection.remove(f)
444 # Update bytes_written from current local collection and
445 # report initial progress.
448 self._upload_started = True # Used by the update thread to start checkpointing
452 # Stop the thread before doing anything else
453 self._stop_checkpointer.set()
454 self._checkpointer.join()
455 # Commit all pending blocks & one last _update()
456 self._local_collection.manifest_text()
457 self._update(final=True)
459 self.save_collection()
461 self._cache_file.close()
463 def save_collection(self):
465 # Check if files should be updated on the remote collection.
466 for fp in self._file_paths:
467 remote_file = self._remote_collection.find(fp)
469 # File don't exist on remote collection, copy it.
470 self._remote_collection.copy(fp, fp, self._local_collection)
471 elif remote_file != self._local_collection.find(fp):
472 # A different file exist on remote collection, overwrite it.
473 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
475 # The file already exist on remote collection, skip it.
477 self._remote_collection.save(num_retries=self.num_retries)
479 self._local_collection.save_new(
480 name=self.name, owner_uuid=self.owner_uuid,
481 ensure_unique_name=self.ensure_unique_name,
482 num_retries=self.num_retries)
484 def destroy_cache(self):
487 os.unlink(self._cache_filename)
488 except OSError as error:
489 # That's what we wanted anyway.
490 if error.errno != errno.ENOENT:
492 self._cache_file.close()
494 def _collection_size(self, collection):
496 Recursively get the total size of the collection
499 for item in collection.values():
500 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
501 size += self._collection_size(item)
506 def _update_task(self):
508 Periodically called support task. File uploading is
509 asynchronous so we poll status from the collection.
511 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
514 def _update(self, final=False):
516 Update cached manifest text and report progress.
518 if self._upload_started:
519 with self._collection_lock:
520 self.bytes_written = self._collection_size(self._local_collection)
523 manifest = self._local_collection.manifest_text()
525 # Get the manifest text without comitting pending blocks
526 manifest = self._local_collection.manifest_text(strip=False,
530 with self._state_lock:
531 self._state['manifest'] = manifest
535 self.bytes_written = self.bytes_skipped
536 # Call the reporter, if any
537 self.report_progress()
539 def report_progress(self):
540 if self.reporter is not None:
541 self.reporter(self.bytes_written, self.bytes_expected)
543 def _write_stdin(self, filename):
544 output = self._local_collection.open(filename, 'w')
545 self._write(sys.stdin, output)
548 def _check_file(self, source, filename):
549 """Check if this file needs to be uploaded"""
551 should_upload = False
552 new_file_in_cache = False
553 # Record file path for updating the remote collection before exiting
554 self._file_paths.add(filename)
556 with self._state_lock:
557 # If no previous cached data on this file, store it for an eventual
559 if source not in self._state['files']:
560 self._state['files'][source] = {
561 'mtime': os.path.getmtime(source),
562 'size' : os.path.getsize(source)
564 new_file_in_cache = True
565 cached_file_data = self._state['files'][source]
567 # Check if file was already uploaded (at least partially)
568 file_in_local_collection = self._local_collection.find(filename)
570 # If not resuming, upload the full file.
573 # New file detected from last run, upload it.
574 elif new_file_in_cache:
576 # Local file didn't change from last run.
577 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
578 if not file_in_local_collection:
579 # File not uploaded yet, upload it completely
581 elif file_in_local_collection.permission_expired():
582 # Permission token expired, re-upload file. This will change whenever
583 # we have a API for refreshing tokens.
585 self._local_collection.remove(filename)
586 elif cached_file_data['size'] == file_in_local_collection.size():
587 # File already there, skip it.
588 self.bytes_skipped += cached_file_data['size']
589 elif cached_file_data['size'] > file_in_local_collection.size():
590 # File partially uploaded, resume!
591 resume_offset = file_in_local_collection.size()
592 self.bytes_skipped += resume_offset
595 # Inconsistent cache, re-upload the file
597 self._local_collection.remove(filename)
598 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
599 # Local file differs from cached data, re-upload it.
601 if file_in_local_collection:
602 self._local_collection.remove(filename)
606 self._files_to_upload.append((source, resume_offset, filename))
608 def _upload_files(self):
609 for source, resume_offset, filename in self._files_to_upload:
610 with open(source, 'r') as source_fd:
611 with self._state_lock:
612 self._state['files'][source]['mtime'] = os.path.getmtime(source)
613 self._state['files'][source]['size'] = os.path.getsize(source)
614 if resume_offset > 0:
615 # Start upload where we left off
616 output = self._local_collection.open(filename, 'a')
617 source_fd.seek(resume_offset)
620 output = self._local_collection.open(filename, 'w')
621 self._write(source_fd, output)
622 output.close(flush=False)
624 def _write(self, source_fd, output):
626 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
631 def _my_collection(self):
632 return self._remote_collection if self.update else self._local_collection
634 def _setup_state(self, update_collection):
636 Create a new cache file or load a previously existing one.
638 # Load an already existing collection for update
639 if update_collection and re.match(arvados.util.collection_uuid_pattern,
642 self._remote_collection = arvados.collection.Collection(update_collection)
643 except arvados.errors.ApiError as error:
644 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
647 elif update_collection:
648 # Collection locator provided, but unknown format
649 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
652 # Set up cache file name from input paths.
654 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
655 realpaths = sorted(os.path.realpath(path) for path in self.paths)
656 md5.update('\0'.join(realpaths))
658 md5.update(self.filename)
659 cache_filename = md5.hexdigest()
660 cache_filepath = os.path.join(
661 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
664 self._cache_file = open(cache_filepath, 'a+')
666 # --no-resume means start with a empty cache file.
667 self._cache_file = open(cache_filepath, 'w+')
668 self._cache_filename = self._cache_file.name
669 self._lock_file(self._cache_file)
670 self._cache_file.seek(0)
672 with self._state_lock:
675 self._state = json.load(self._cache_file)
676 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
677 # Cache at least partially incomplete, set up new cache
678 self._state = copy.deepcopy(self.EMPTY_STATE)
680 # Cache file empty, set up new cache
681 self._state = copy.deepcopy(self.EMPTY_STATE)
683 # No cache file, set empty state
684 self._state = copy.deepcopy(self.EMPTY_STATE)
685 # Load the previous manifest so we can check if files were modified remotely.
686 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
688 def collection_file_paths(self, col, path_prefix='.'):
689 """Return a list of file paths by recursively go through the entire collection `col`"""
691 for name, item in col.items():
692 if isinstance(item, arvados.arvfile.ArvadosFile):
693 file_paths.append(os.path.join(path_prefix, name))
694 elif isinstance(item, arvados.collection.Subcollection):
695 new_prefix = os.path.join(path_prefix, name)
696 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
699 def _lock_file(self, fileobj):
701 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
703 raise ResumeCacheConflict("{} locked".format(fileobj.name))
705 def _save_state(self):
707 Atomically save current state into cache.
710 with self._state_lock:
711 # We're not using copy.deepcopy() here because it's a lot slower
712 # than json.dumps(), and we're already needing JSON format to be
714 state = json.dumps(self._state)
715 new_cache_fd, new_cache_name = tempfile.mkstemp(
716 dir=os.path.dirname(self._cache_filename))
717 self._lock_file(new_cache_fd)
718 new_cache = os.fdopen(new_cache_fd, 'r+')
719 new_cache.write(state)
722 os.rename(new_cache_name, self._cache_filename)
723 except (IOError, OSError, ResumeCacheConflict) as error:
724 self.logger.error("There was a problem while saving the cache file: {}".format(error))
726 os.unlink(new_cache_name)
727 except NameError: # mkstemp failed.
730 self._cache_file.close()
731 self._cache_file = new_cache
733 def collection_name(self):
734 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
736 def manifest_locator(self):
737 return self._my_collection().manifest_locator()
739 def portable_data_hash(self):
740 pdh = self._my_collection().portable_data_hash()
741 m = self._my_collection().stripped_manifest()
742 local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
744 logger.warning("\n".join([
745 "arv-put: API server provided PDH differs from local manifest.",
746 " This should not happen; showing API server version."]))
749 def manifest_text(self, stream_name=".", strip=False, normalize=False):
750 return self._my_collection().manifest_text(stream_name, strip, normalize)
752 def _datablocks_on_item(self, item):
754 Return a list of datablock locators, recursively navigating
755 through subcollections
757 if isinstance(item, arvados.arvfile.ArvadosFile):
760 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
763 for segment in item.segments():
764 loc = segment.locator
767 elif isinstance(item, arvados.collection.Collection):
768 l = [self._datablocks_on_item(x) for x in item.values()]
769 # Fast list flattener method taken from:
770 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
771 return [loc for sublist in l for loc in sublist]
775 def data_locators(self):
776 with self._collection_lock:
777 # Make sure all datablocks are flushed before getting the locators
778 self._my_collection().manifest_text()
779 datablocks = self._datablocks_on_item(self._my_collection())
783 def expected_bytes_for(pathlist):
784 # Walk the given directory trees and stat files, adding up file sizes,
785 # so we can display progress as percent
787 for path in pathlist:
788 if os.path.isdir(path):
789 for filename in arvados.util.listdir_recursive(path):
790 bytesum += os.path.getsize(os.path.join(path, filename))
791 elif not os.path.isfile(path):
794 bytesum += os.path.getsize(path)
797 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
799 def machine_progress(bytes_written, bytes_expected):
800 return _machine_format.format(
801 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
803 def human_progress(bytes_written, bytes_expected):
805 return "\r{}M / {}M {:.1%} ".format(
806 bytes_written >> 20, bytes_expected >> 20,
807 float(bytes_written) / bytes_expected)
809 return "\r{} ".format(bytes_written)
811 def progress_writer(progress_func, outfile=sys.stderr):
812 def write_progress(bytes_written, bytes_expected):
813 outfile.write(progress_func(bytes_written, bytes_expected))
814 return write_progress
816 def exit_signal_handler(sigcode, frame):
819 def desired_project_uuid(api_client, project_uuid, num_retries):
821 query = api_client.users().current()
822 elif arvados.util.user_uuid_pattern.match(project_uuid):
823 query = api_client.users().get(uuid=project_uuid)
824 elif arvados.util.group_uuid_pattern.match(project_uuid):
825 query = api_client.groups().get(uuid=project_uuid)
827 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
828 return query.execute(num_retries=num_retries)['uuid']
830 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
833 logger = logging.getLogger('arvados.arv_put')
834 logger.setLevel(logging.INFO)
835 args = parse_arguments(arguments)
837 if api_client is None:
838 api_client = arvados.api('v1')
840 # Determine the name to use
842 if args.stream or args.raw:
843 logger.error("Cannot use --name with --stream or --raw")
845 elif args.update_collection:
846 logger.error("Cannot use --name with --update-collection")
848 collection_name = args.name
850 collection_name = "Saved at {} by {}@{}".format(
851 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
852 pwd.getpwuid(os.getuid()).pw_name,
853 socket.gethostname())
855 if args.project_uuid and (args.stream or args.raw):
856 logger.error("Cannot use --project-uuid with --stream or --raw")
859 # Determine the parent project
861 project_uuid = desired_project_uuid(api_client, args.project_uuid,
863 except (apiclient_errors.Error, ValueError) as error:
868 reporter = progress_writer(human_progress)
869 elif args.batch_progress:
870 reporter = progress_writer(machine_progress)
874 # If this is used by a human, and there's at least one directory to be
875 # uploaded, the expected bytes calculation can take a moment.
876 if args.progress and any([os.path.isdir(f) for f in args.paths]):
877 logger.info("Calculating upload size, this could take some time...")
878 bytes_expected = expected_bytes_for(args.paths)
881 writer = ArvPutUploadJob(paths = args.paths,
882 resume = args.resume,
883 use_cache = args.use_cache,
884 filename = args.filename,
886 bytes_expected = bytes_expected,
887 num_retries = args.retries,
888 replication_desired = args.replication,
889 put_threads = args.threads,
890 name = collection_name,
891 owner_uuid = project_uuid,
892 ensure_unique_name = True,
893 update_collection = args.update_collection,
895 dry_run=args.dry_run)
896 except ResumeCacheConflict:
897 logger.error("\n".join([
898 "arv-put: Another process is already uploading this data.",
899 " Use --no-cache if this is really what you want."]))
901 except CollectionUpdateError as error:
902 logger.error("\n".join([
903 "arv-put: %s" % str(error)]))
905 except ArvPutUploadIsPending:
906 # Dry run check successful, return proper exit code.
908 except ArvPutUploadNotPending:
909 # No files pending for upload
912 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
914 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
915 for sigcode in CAUGHT_SIGNALS}
917 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
918 logger.warning("\n".join([
919 "arv-put: Resuming previous upload from last checkpoint.",
920 " Use the --no-resume option to start over."]))
923 writer.report_progress()
926 writer.start(save_collection=not(args.stream or args.raw))
927 except arvados.errors.ApiError as error:
928 logger.error("\n".join([
929 "arv-put: %s" % str(error)]))
931 except ArvPutUploadIsPending:
932 # Dry run check successful, return proper exit code.
934 except ArvPutUploadNotPending:
935 # No files pending for upload
938 if args.progress: # Print newline to split stderr from stdout for humans.
943 output = writer.manifest_text(normalize=True)
945 output = writer.manifest_text()
947 output = ','.join(writer.data_locators())
950 if args.update_collection:
951 logger.info("Collection updated: '{}'".format(writer.collection_name()))
953 logger.info("Collection saved as '{}'".format(writer.collection_name()))
954 if args.portable_data_hash:
955 output = writer.portable_data_hash()
957 output = writer.manifest_locator()
958 except apiclient_errors.Error as error:
960 "arv-put: Error creating Collection on project: {}.".format(
964 # Print the locator (uuid) of the new collection.
969 if not output.endswith('\n'):
972 for sigcode, orig_handler in orig_signal_handlers.items():
973 signal.signal(sigcode, orig_handler)
982 if __name__ == '__main__':