4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
29 import arvados.commands._util as arv_cmd
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 upload_opts = argparse.ArgumentParser(add_help=False)
36 upload_opts.add_argument('--version', action='version',
37 version="%s %s" % (sys.argv[0], __version__),
38 help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41 Local file or directory. Default: read from standard input.
44 _group = upload_opts.add_mutually_exclusive_group()
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47 default=-1, help=argparse.SUPPRESS)
49 _group.add_argument('--normalize', action='store_true',
51 Normalize the manifest by re-ordering files and streams after writing
55 _group.add_argument('--dry-run', action='store_true', default=False,
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
61 _group = upload_opts.add_mutually_exclusive_group()
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
68 _group.add_argument('--stream', action='store_true',
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77 Synonym for --manifest.
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82 Synonym for --manifest.
85 _group.add_argument('--manifest', action='store_true',
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
97 _group.add_argument('--raw', action='store_true',
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105 dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111 dest='filename', help="""
112 Synonym for --filename.
115 upload_opts.add_argument('--filename', type=str, default=None,
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
136 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138 Set the number of upload threads to be used. Take into account that
139 using lots of threads will increase the RAM requirements. Default is
141 On high latency installations, using a greater number will improve
145 run_opts = argparse.ArgumentParser(add_help=False)
147 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148 Store the collection in the specified project, instead of your Home
152 run_opts.add_argument('--name', help="""
153 Save the collection with the specified name.
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--progress', action='store_true',
159 Display human-readable progress on stderr (bytes and, if possible,
160 percentage of total data size). This is the default behavior when
164 _group.add_argument('--no-progress', action='store_true',
166 Do not display human-readable progress on stderr, even if stderr is a
170 _group.add_argument('--batch-progress', action='store_true',
172 Display machine-readable progress on stderr (bytes and, if known,
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--resume', action='store_true', default=True,
179 Continue interrupted uploads from cached state (default).
181 _group.add_argument('--no-resume', action='store_false', dest='resume',
183 Do not continue interrupted uploads from cached state.
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
189 Save upload state in a cache file for resuming (default).
191 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
193 Do not save upload state in a cache file for resuming.
196 arg_parser = argparse.ArgumentParser(
197 description='Copy data from the local filesystem to Keep.',
198 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
200 def parse_arguments(arguments):
201 args = arg_parser.parse_args(arguments)
203 if len(args.paths) == 0:
206 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
208 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
211 --filename argument cannot be used when storing a directory or
215 # Turn on --progress by default if stderr is a tty.
216 if (not (args.batch_progress or args.no_progress)
217 and os.isatty(sys.stderr.fileno())):
220 # Turn off --resume (default) if --no-cache is used.
221 if not args.use_cache:
224 if args.paths == ['-']:
225 if args.update_collection:
227 --update-collection cannot be used when reading from stdin.
230 args.use_cache = False
231 if not args.filename:
232 args.filename = 'stdin'
237 class CollectionUpdateError(Exception):
241 class ResumeCacheConflict(Exception):
245 class ArvPutArgumentConflict(Exception):
249 class ArvPutUploadIsPending(Exception):
253 class ArvPutUploadNotPending(Exception):
257 class FileUploadList(list):
258 def __init__(self, dry_run=False):
260 self.dry_run = dry_run
262 def append(self, other):
264 raise ArvPutUploadIsPending()
265 super(FileUploadList, self).append(other)
268 class ResumeCache(object):
269 CACHE_DIR = '.cache/arvados/arv-put'
271 def __init__(self, file_spec):
272 self.cache_file = open(file_spec, 'a+')
273 self._lock_file(self.cache_file)
274 self.filename = self.cache_file.name
277 def make_path(cls, args):
279 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280 realpaths = sorted(os.path.realpath(path) for path in args.paths)
281 md5.update('\0'.join(realpaths))
282 if any(os.path.isdir(path) for path in realpaths):
285 md5.update(args.filename)
287 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
290 def _lock_file(self, fileobj):
292 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
294 raise ResumeCacheConflict("{} locked".format(fileobj.name))
297 self.cache_file.seek(0)
298 return json.load(self.cache_file)
300 def check_cache(self, api_client=None, num_retries=0):
305 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306 locator = state["_finished_streams"][0][1][0]
307 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308 locator = state["_current_stream_locators"][0]
309 if locator is not None:
310 kc = arvados.keep.KeepClient(api_client=api_client)
311 kc.head(locator, num_retries=num_retries)
312 except Exception as e:
317 def save(self, data):
319 new_cache_fd, new_cache_name = tempfile.mkstemp(
320 dir=os.path.dirname(self.filename))
321 self._lock_file(new_cache_fd)
322 new_cache = os.fdopen(new_cache_fd, 'r+')
323 json.dump(data, new_cache)
324 os.rename(new_cache_name, self.filename)
325 except (IOError, OSError, ResumeCacheConflict) as error:
327 os.unlink(new_cache_name)
328 except NameError: # mkstemp failed.
331 self.cache_file.close()
332 self.cache_file = new_cache
335 self.cache_file.close()
339 os.unlink(self.filename)
340 except OSError as error:
341 if error.errno != errno.ENOENT: # That's what we wanted anyway.
347 self.__init__(self.filename)
350 class ArvPutUploadJob(object):
351 CACHE_DIR = '.cache/arvados/arv-put'
353 'manifest' : None, # Last saved manifest checkpoint
354 'files' : {} # Previous run file list: {path : {size, mtime}}
357 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358 bytes_expected=None, name=None, owner_uuid=None,
359 ensure_unique_name=False, num_retries=None,
360 put_threads=None, replication_desired=None,
361 filename=None, update_time=60.0, update_collection=None,
362 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
365 self.use_cache = use_cache
367 self.reporter = reporter
368 self.bytes_expected = bytes_expected
369 self.bytes_written = 0
370 self.bytes_skipped = 0
372 self.owner_uuid = owner_uuid
373 self.ensure_unique_name = ensure_unique_name
374 self.num_retries = num_retries
375 self.replication_desired = replication_desired
376 self.put_threads = put_threads
377 self.filename = filename
378 self._state_lock = threading.Lock()
379 self._state = None # Previous run state (file list & manifest)
380 self._current_files = [] # Current run file list
381 self._cache_file = None
382 self._collection_lock = threading.Lock()
383 self._remote_collection = None # Collection being updated (if asked)
384 self._local_collection = None # Collection from previous run manifest
385 self._file_paths = [] # Files to be updated in remote collection
386 self._stop_checkpointer = threading.Event()
387 self._checkpointer = threading.Thread(target=self._update_task)
388 self._checkpointer.daemon = True
389 self._update_task_time = update_time # How many seconds wait between update runs
390 self._files_to_upload = FileUploadList(dry_run=dry_run)
392 self.dry_run = dry_run
394 if not self.use_cache and self.resume:
395 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
397 # Check for obvious dry-run responses
398 if self.dry_run and (not self.use_cache or not self.resume):
399 raise ArvPutUploadIsPending()
401 # Load cached data if any and if needed
402 self._setup_state(update_collection)
404 def start(self, save_collection):
406 Start supporting thread & file uploading
409 self._checkpointer.start()
411 for path in self.paths:
412 # Test for stdin first, in case some file named '-' exist
415 raise ArvPutUploadIsPending()
416 self._write_stdin(self.filename or 'stdin')
417 elif os.path.isdir(path):
418 # Use absolute paths on cache index so CWD doesn't interfere
419 # with the caching logic.
420 prefixdir = path = os.path.abspath(path)
423 for root, dirs, files in os.walk(path):
424 # Make os.walk()'s dir traversing order deterministic
428 self._check_file(os.path.join(root, f),
429 os.path.join(root[len(prefixdir):], f))
431 self._check_file(os.path.abspath(path),
432 self.filename or os.path.basename(path))
433 # If dry-mode is on, and got up to this point, then we should notify that
434 # there aren't any file to upload.
436 raise ArvPutUploadNotPending()
437 # Remove local_collection's files that don't exist locally anymore, so the
438 # bytes_written count is correct.
439 for f in self.collection_file_paths(self._local_collection,
441 if f != 'stdin' and f != self.filename and not f in self._file_paths:
442 self._local_collection.remove(f)
443 # Update bytes_written from current local collection and
444 # report initial progress.
450 # Stop the thread before doing anything else
451 self._stop_checkpointer.set()
452 self._checkpointer.join()
453 # Commit all pending blocks & one last _update()
454 self._local_collection.manifest_text()
455 self._update(final=True)
457 self.save_collection()
459 self._cache_file.close()
461 def save_collection(self):
463 # Check if files should be updated on the remote collection.
464 for fp in self._file_paths:
465 remote_file = self._remote_collection.find(fp)
467 # File don't exist on remote collection, copy it.
468 self._remote_collection.copy(fp, fp, self._local_collection)
469 elif remote_file != self._local_collection.find(fp):
470 # A different file exist on remote collection, overwrite it.
471 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
473 # The file already exist on remote collection, skip it.
475 self._remote_collection.save(num_retries=self.num_retries)
477 self._local_collection.save_new(
478 name=self.name, owner_uuid=self.owner_uuid,
479 ensure_unique_name=self.ensure_unique_name,
480 num_retries=self.num_retries)
482 def destroy_cache(self):
485 os.unlink(self._cache_filename)
486 except OSError as error:
487 # That's what we wanted anyway.
488 if error.errno != errno.ENOENT:
490 self._cache_file.close()
492 def _collection_size(self, collection):
494 Recursively get the total size of the collection
497 for item in collection.values():
498 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
499 size += self._collection_size(item)
504 def _update_task(self):
506 Periodically called support task. File uploading is
507 asynchronous so we poll status from the collection.
509 while not self._stop_checkpointer.wait(self._update_task_time):
512 def _update(self, final=False):
514 Update cached manifest text and report progress.
516 with self._collection_lock:
517 self.bytes_written = self._collection_size(self._local_collection)
520 manifest = self._local_collection.manifest_text()
522 # Get the manifest text without comitting pending blocks
523 manifest = self._local_collection.manifest_text(".", strip=False,
527 with self._state_lock:
528 self._state['manifest'] = manifest
531 # Call the reporter, if any
532 self.report_progress()
534 def report_progress(self):
535 if self.reporter is not None:
536 self.reporter(self.bytes_written, self.bytes_expected)
538 def _write_stdin(self, filename):
539 output = self._local_collection.open(filename, 'w')
540 self._write(sys.stdin, output)
543 def _check_file(self, source, filename):
544 """Check if this file needs to be uploaded"""
546 should_upload = False
547 new_file_in_cache = False
548 # Record file path for updating the remote collection before exiting
549 self._file_paths.append(filename)
551 with self._state_lock:
552 # If no previous cached data on this file, store it for an eventual
554 if source not in self._state['files']:
555 self._state['files'][source] = {
556 'mtime': os.path.getmtime(source),
557 'size' : os.path.getsize(source)
559 new_file_in_cache = True
560 cached_file_data = self._state['files'][source]
562 # Check if file was already uploaded (at least partially)
563 file_in_local_collection = self._local_collection.find(filename)
565 # If not resuming, upload the full file.
568 # New file detected from last run, upload it.
569 elif new_file_in_cache:
571 # Local file didn't change from last run.
572 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
573 if not file_in_local_collection:
574 # File not uploaded yet, upload it completely
576 elif file_in_local_collection.permission_expired():
577 # Permission token expired, re-upload file. This will change whenever
578 # we have a API for refreshing tokens.
580 self._local_collection.remove(filename)
581 elif cached_file_data['size'] == file_in_local_collection.size():
582 # File already there, skip it.
583 self.bytes_skipped += cached_file_data['size']
584 elif cached_file_data['size'] > file_in_local_collection.size():
585 # File partially uploaded, resume!
586 resume_offset = file_in_local_collection.size()
587 self.bytes_skipped += resume_offset
590 # Inconsistent cache, re-upload the file
592 self._local_collection.remove(filename)
593 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
594 # Local file differs from cached data, re-upload it.
596 if file_in_local_collection:
597 self._local_collection.remove(filename)
601 self._files_to_upload.append((source, resume_offset, filename))
603 def _upload_files(self):
604 for source, resume_offset, filename in self._files_to_upload:
605 with open(source, 'r') as source_fd:
606 with self._state_lock:
607 self._state['files'][source]['mtime'] = os.path.getmtime(source)
608 self._state['files'][source]['size'] = os.path.getsize(source)
609 if resume_offset > 0:
610 # Start upload where we left off
611 output = self._local_collection.open(filename, 'a')
612 source_fd.seek(resume_offset)
615 output = self._local_collection.open(filename, 'w')
616 self._write(source_fd, output)
617 output.close(flush=False)
619 def _write(self, source_fd, output):
621 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
626 def _my_collection(self):
627 return self._remote_collection if self.update else self._local_collection
629 def _setup_state(self, update_collection):
631 Create a new cache file or load a previously existing one.
633 # Load an already existing collection for update
634 if update_collection and re.match(arvados.util.collection_uuid_pattern,
637 self._remote_collection = arvados.collection.Collection(update_collection)
638 except arvados.errors.ApiError as error:
639 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
642 elif update_collection:
643 # Collection locator provided, but unknown format
644 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
647 # Set up cache file name from input paths.
649 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
650 realpaths = sorted(os.path.realpath(path) for path in self.paths)
651 md5.update('\0'.join(realpaths))
653 md5.update(self.filename)
654 cache_filename = md5.hexdigest()
655 cache_filepath = os.path.join(
656 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
659 self._cache_file = open(cache_filepath, 'a+')
661 # --no-resume means start with a empty cache file.
662 self._cache_file = open(cache_filepath, 'w+')
663 self._cache_filename = self._cache_file.name
664 self._lock_file(self._cache_file)
665 self._cache_file.seek(0)
667 with self._state_lock:
670 self._state = json.load(self._cache_file)
671 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
672 # Cache at least partially incomplete, set up new cache
673 self._state = copy.deepcopy(self.EMPTY_STATE)
675 # Cache file empty, set up new cache
676 self._state = copy.deepcopy(self.EMPTY_STATE)
678 # No cache file, set empty state
679 self._state = copy.deepcopy(self.EMPTY_STATE)
680 # Load the previous manifest so we can check if files were modified remotely.
681 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
683 def collection_file_paths(self, col, path_prefix='.'):
684 """Return a list of file paths by recursively go through the entire collection `col`"""
686 for name, item in col.items():
687 if isinstance(item, arvados.arvfile.ArvadosFile):
688 file_paths.append(os.path.join(path_prefix, name))
689 elif isinstance(item, arvados.collection.Subcollection):
690 new_prefix = os.path.join(path_prefix, name)
691 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
694 def _lock_file(self, fileobj):
696 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
698 raise ResumeCacheConflict("{} locked".format(fileobj.name))
700 def _save_state(self):
702 Atomically save current state into cache.
705 with self._state_lock:
706 state = copy.deepcopy(self._state)
707 new_cache_fd, new_cache_name = tempfile.mkstemp(
708 dir=os.path.dirname(self._cache_filename))
709 self._lock_file(new_cache_fd)
710 new_cache = os.fdopen(new_cache_fd, 'r+')
711 json.dump(state, new_cache)
714 os.rename(new_cache_name, self._cache_filename)
715 except (IOError, OSError, ResumeCacheConflict) as error:
716 self.logger.error("There was a problem while saving the cache file: {}".format(error))
718 os.unlink(new_cache_name)
719 except NameError: # mkstemp failed.
722 self._cache_file.close()
723 self._cache_file = new_cache
725 def collection_name(self):
726 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
728 def manifest_locator(self):
729 return self._my_collection().manifest_locator()
731 def portable_data_hash(self):
732 return self._my_collection().portable_data_hash()
734 def manifest_text(self, stream_name=".", strip=False, normalize=False):
735 return self._my_collection().manifest_text(stream_name, strip, normalize)
737 def _datablocks_on_item(self, item):
739 Return a list of datablock locators, recursively navigating
740 through subcollections
742 if isinstance(item, arvados.arvfile.ArvadosFile):
745 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
748 for segment in item.segments():
749 loc = segment.locator
752 elif isinstance(item, arvados.collection.Collection):
753 l = [self._datablocks_on_item(x) for x in item.values()]
754 # Fast list flattener method taken from:
755 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
756 return [loc for sublist in l for loc in sublist]
760 def data_locators(self):
761 with self._collection_lock:
762 # Make sure all datablocks are flushed before getting the locators
763 self._my_collection().manifest_text()
764 datablocks = self._datablocks_on_item(self._my_collection())
768 def expected_bytes_for(pathlist):
769 # Walk the given directory trees and stat files, adding up file sizes,
770 # so we can display progress as percent
772 for path in pathlist:
773 if os.path.isdir(path):
774 for filename in arvados.util.listdir_recursive(path):
775 bytesum += os.path.getsize(os.path.join(path, filename))
776 elif not os.path.isfile(path):
779 bytesum += os.path.getsize(path)
782 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
784 def machine_progress(bytes_written, bytes_expected):
785 return _machine_format.format(
786 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
788 def human_progress(bytes_written, bytes_expected):
790 return "\r{}M / {}M {:.1%} ".format(
791 bytes_written >> 20, bytes_expected >> 20,
792 float(bytes_written) / bytes_expected)
794 return "\r{} ".format(bytes_written)
796 def progress_writer(progress_func, outfile=sys.stderr):
797 def write_progress(bytes_written, bytes_expected):
798 outfile.write(progress_func(bytes_written, bytes_expected))
799 return write_progress
801 def exit_signal_handler(sigcode, frame):
804 def desired_project_uuid(api_client, project_uuid, num_retries):
806 query = api_client.users().current()
807 elif arvados.util.user_uuid_pattern.match(project_uuid):
808 query = api_client.users().get(uuid=project_uuid)
809 elif arvados.util.group_uuid_pattern.match(project_uuid):
810 query = api_client.groups().get(uuid=project_uuid)
812 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
813 return query.execute(num_retries=num_retries)['uuid']
815 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
818 logger = logging.getLogger('arvados.arv_put')
819 args = parse_arguments(arguments)
821 if api_client is None:
822 api_client = arvados.api('v1')
824 # Determine the name to use
826 if args.stream or args.raw:
827 logger.error("Cannot use --name with --stream or --raw")
829 elif args.update_collection:
830 logger.error("Cannot use --name with --update-collection")
832 collection_name = args.name
834 collection_name = "Saved at {} by {}@{}".format(
835 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
836 pwd.getpwuid(os.getuid()).pw_name,
837 socket.gethostname())
839 if args.project_uuid and (args.stream or args.raw):
840 logger.error("Cannot use --project-uuid with --stream or --raw")
843 # Determine the parent project
845 project_uuid = desired_project_uuid(api_client, args.project_uuid,
847 except (apiclient_errors.Error, ValueError) as error:
852 reporter = progress_writer(human_progress)
853 elif args.batch_progress:
854 reporter = progress_writer(machine_progress)
858 bytes_expected = expected_bytes_for(args.paths)
861 writer = ArvPutUploadJob(paths = args.paths,
862 resume = args.resume,
863 use_cache = args.use_cache,
864 filename = args.filename,
866 bytes_expected = bytes_expected,
867 num_retries = args.retries,
868 replication_desired = args.replication,
869 put_threads = args.threads,
870 name = collection_name,
871 owner_uuid = project_uuid,
872 ensure_unique_name = True,
873 update_collection = args.update_collection,
875 dry_run=args.dry_run)
876 except ResumeCacheConflict:
877 logger.error("\n".join([
878 "arv-put: Another process is already uploading this data.",
879 " Use --no-cache if this is really what you want."]))
881 except CollectionUpdateError as error:
882 logger.error("\n".join([
883 "arv-put: %s" % str(error)]))
885 except ArvPutUploadIsPending:
886 # Dry run check successful, return proper exit code.
888 except ArvPutUploadNotPending:
889 # No files pending for upload
892 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
894 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
895 for sigcode in CAUGHT_SIGNALS}
897 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
898 logger.warning("\n".join([
899 "arv-put: Resuming previous upload from last checkpoint.",
900 " Use the --no-resume option to start over."]))
903 writer.report_progress()
906 writer.start(save_collection=not(args.stream or args.raw))
907 except arvados.errors.ApiError as error:
908 logger.error("\n".join([
909 "arv-put: %s" % str(error)]))
911 except ArvPutUploadIsPending:
912 # Dry run check successful, return proper exit code.
914 except ArvPutUploadNotPending:
915 # No files pending for upload
918 if args.progress: # Print newline to split stderr from stdout for humans.
923 output = writer.manifest_text(normalize=True)
925 output = writer.manifest_text()
927 output = ','.join(writer.data_locators())
930 if args.update_collection:
931 logger.info("Collection updated: '{}'".format(writer.collection_name()))
933 logger.info("Collection saved as '{}'".format(writer.collection_name()))
934 if args.portable_data_hash:
935 output = writer.portable_data_hash()
937 output = writer.manifest_locator()
938 except apiclient_errors.Error as error:
940 "arv-put: Error creating Collection on project: {}.".format(
944 # Print the locator (uuid) of the new collection.
949 if not output.endswith('\n'):
952 for sigcode, orig_handler in orig_signal_handlers.items():
953 signal.signal(sigcode, orig_handler)
962 if __name__ == '__main__':