4 # --md5sum - display md5 of each file as read from disk
6 from __future__ import division
7 from builtins import str
8 from past.utils import old_div
9 from builtins import object
12 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. Default: read from standard input.
50 _group = upload_opts.add_mutually_exclusive_group()
52 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
53 default=-1, help=argparse.SUPPRESS)
55 _group.add_argument('--normalize', action='store_true',
57 Normalize the manifest by re-ordering files and streams after writing
61 _group.add_argument('--dry-run', action='store_true', default=False,
63 Don't actually upload files, but only check if any file should be
64 uploaded. Exit with code=2 when files are pending for upload.
67 _group = upload_opts.add_mutually_exclusive_group()
69 _group.add_argument('--as-stream', action='store_true', dest='stream',
74 _group.add_argument('--stream', action='store_true',
76 Store the file content and display the resulting manifest on
77 stdout. Do not write the manifest to Keep or save a Collection object
81 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83 Synonym for --manifest.
86 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88 Synonym for --manifest.
91 _group.add_argument('--manifest', action='store_true',
93 Store the file data and resulting manifest in Keep, save a Collection
94 object in Arvados, and display the manifest locator (Collection uuid)
95 on stdout. This is the default behavior.
98 _group.add_argument('--as-raw', action='store_true', dest='raw',
103 _group.add_argument('--raw', action='store_true',
105 Store the file content and display the data block locators on stdout,
106 separated by commas, with a trailing newline. Do not store a
110 upload_opts.add_argument('--update-collection', type=str, default=None,
111 dest='update_collection', metavar="UUID", help="""
112 Update an existing collection identified by the given Arvados collection
113 UUID. All new local files will be uploaded.
116 upload_opts.add_argument('--use-filename', type=str, default=None,
117 dest='filename', help="""
118 Synonym for --filename.
121 upload_opts.add_argument('--filename', type=str, default=None,
123 Use the given filename in the manifest, instead of the name of the
124 local file. This is useful when "-" or "/dev/stdin" is given as an
125 input file. It can be used only if there is exactly one path given and
126 it is not a directory. Implies --manifest.
129 upload_opts.add_argument('--portable-data-hash', action='store_true',
131 Print the portable data hash instead of the Arvados UUID for the collection
132 created by the upload.
135 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137 Set the replication level for the new collection: how many different
138 physical storage devices (e.g., disks) should have a copy of each data
139 block. Default is to use the server-provided default (if any) or 2.
142 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
144 Set the number of upload threads to be used. Take into account that
145 using lots of threads will increase the RAM requirements. Default is
147 On high latency installations, using a greater number will improve
151 run_opts = argparse.ArgumentParser(add_help=False)
153 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
154 Store the collection in the specified project, instead of your Home
158 run_opts.add_argument('--name', help="""
159 Save the collection with the specified name.
162 _group = run_opts.add_mutually_exclusive_group()
163 _group.add_argument('--progress', action='store_true',
165 Display human-readable progress on stderr (bytes and, if possible,
166 percentage of total data size). This is the default behavior when
170 _group.add_argument('--no-progress', action='store_true',
172 Do not display human-readable progress on stderr, even if stderr is a
176 _group.add_argument('--batch-progress', action='store_true',
178 Display machine-readable progress on stderr (bytes and, if known,
182 _group = run_opts.add_mutually_exclusive_group()
183 _group.add_argument('--resume', action='store_true', default=True,
185 Continue interrupted uploads from cached state (default).
187 _group.add_argument('--no-resume', action='store_false', dest='resume',
189 Do not continue interrupted uploads from cached state.
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
195 Save upload state in a cache file for resuming (default).
197 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
199 Do not save upload state in a cache file for resuming.
202 arg_parser = argparse.ArgumentParser(
203 description='Copy data from the local filesystem to Keep.',
204 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
206 def parse_arguments(arguments):
207 args = arg_parser.parse_args(arguments)
209 if len(args.paths) == 0:
212 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
214 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
217 --filename argument cannot be used when storing a directory or
221 # Turn on --progress by default if stderr is a tty.
222 if (not (args.batch_progress or args.no_progress)
223 and os.isatty(sys.stderr.fileno())):
226 # Turn off --resume (default) if --no-cache is used.
227 if not args.use_cache:
230 if args.paths == ['-']:
231 if args.update_collection:
233 --update-collection cannot be used when reading from stdin.
236 args.use_cache = False
237 if not args.filename:
238 args.filename = 'stdin'
243 class CollectionUpdateError(Exception):
247 class ResumeCacheConflict(Exception):
251 class ArvPutArgumentConflict(Exception):
255 class ArvPutUploadIsPending(Exception):
259 class ArvPutUploadNotPending(Exception):
263 class FileUploadList(list):
264 def __init__(self, dry_run=False):
266 self.dry_run = dry_run
268 def append(self, other):
270 raise ArvPutUploadIsPending()
271 super(FileUploadList, self).append(other)
274 class ResumeCache(object):
275 CACHE_DIR = '.cache/arvados/arv-put'
277 def __init__(self, file_spec):
278 self.cache_file = open(file_spec, 'a+')
279 self._lock_file(self.cache_file)
280 self.filename = self.cache_file.name
283 def make_path(cls, args):
285 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
286 realpaths = sorted(os.path.realpath(path) for path in args.paths)
287 md5.update('\0'.join(realpaths))
288 if any(os.path.isdir(path) for path in realpaths):
291 md5.update(args.filename)
293 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
296 def _lock_file(self, fileobj):
298 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
300 raise ResumeCacheConflict("{} locked".format(fileobj.name))
303 self.cache_file.seek(0)
304 return json.load(self.cache_file)
306 def check_cache(self, api_client=None, num_retries=0):
311 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
312 locator = state["_finished_streams"][0][1][0]
313 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
314 locator = state["_current_stream_locators"][0]
315 if locator is not None:
316 kc = arvados.keep.KeepClient(api_client=api_client)
317 kc.head(locator, num_retries=num_retries)
318 except Exception as e:
323 def save(self, data):
325 new_cache_fd, new_cache_name = tempfile.mkstemp(
326 dir=os.path.dirname(self.filename))
327 self._lock_file(new_cache_fd)
328 new_cache = os.fdopen(new_cache_fd, 'r+')
329 json.dump(data, new_cache)
330 os.rename(new_cache_name, self.filename)
331 except (IOError, OSError, ResumeCacheConflict) as error:
333 os.unlink(new_cache_name)
334 except NameError: # mkstemp failed.
337 self.cache_file.close()
338 self.cache_file = new_cache
341 self.cache_file.close()
345 os.unlink(self.filename)
346 except OSError as error:
347 if error.errno != errno.ENOENT: # That's what we wanted anyway.
353 self.__init__(self.filename)
356 class ArvPutUploadJob(object):
357 CACHE_DIR = '.cache/arvados/arv-put'
359 'manifest' : None, # Last saved manifest checkpoint
360 'files' : {} # Previous run file list: {path : {size, mtime}}
363 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
364 bytes_expected=None, name=None, owner_uuid=None,
365 ensure_unique_name=False, num_retries=None,
366 put_threads=None, replication_desired=None,
367 filename=None, update_time=60.0, update_collection=None,
368 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
371 self.use_cache = use_cache
373 self.reporter = reporter
374 self.bytes_expected = bytes_expected
375 self.bytes_written = 0
376 self.bytes_skipped = 0
378 self.owner_uuid = owner_uuid
379 self.ensure_unique_name = ensure_unique_name
380 self.num_retries = num_retries
381 self.replication_desired = replication_desired
382 self.put_threads = put_threads
383 self.filename = filename
384 self._state_lock = threading.Lock()
385 self._state = None # Previous run state (file list & manifest)
386 self._current_files = [] # Current run file list
387 self._cache_file = None
388 self._collection_lock = threading.Lock()
389 self._remote_collection = None # Collection being updated (if asked)
390 self._local_collection = None # Collection from previous run manifest
391 self._file_paths = set() # Files to be updated in remote collection
392 self._stop_checkpointer = threading.Event()
393 self._checkpointer = threading.Thread(target=self._update_task)
394 self._checkpointer.daemon = True
395 self._update_task_time = update_time # How many seconds wait between update runs
396 self._files_to_upload = FileUploadList(dry_run=dry_run)
397 self._upload_started = False
399 self.dry_run = dry_run
400 self._checkpoint_before_quit = True
402 if not self.use_cache and self.resume:
403 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
405 # Check for obvious dry-run responses
406 if self.dry_run and (not self.use_cache or not self.resume):
407 raise ArvPutUploadIsPending()
409 # Load cached data if any and if needed
410 self._setup_state(update_collection)
412 def start(self, save_collection):
414 Start supporting thread & file uploading
417 self._checkpointer.start()
419 for path in self.paths:
420 # Test for stdin first, in case some file named '-' exist
423 raise ArvPutUploadIsPending()
424 self._write_stdin(self.filename or 'stdin')
425 elif os.path.isdir(path):
426 # Use absolute paths on cache index so CWD doesn't interfere
427 # with the caching logic.
428 prefixdir = path = os.path.abspath(path)
431 for root, dirs, files in os.walk(path):
432 # Make os.walk()'s dir traversing order deterministic
436 self._check_file(os.path.join(root, f),
437 os.path.join(root[len(prefixdir):], f))
439 self._check_file(os.path.abspath(path),
440 self.filename or os.path.basename(path))
441 # If dry-mode is on, and got up to this point, then we should notify that
442 # there aren't any file to upload.
444 raise ArvPutUploadNotPending()
445 # Remove local_collection's files that don't exist locally anymore, so the
446 # bytes_written count is correct.
447 for f in self.collection_file_paths(self._local_collection,
449 if f != 'stdin' and f != self.filename and not f in self._file_paths:
450 self._local_collection.remove(f)
451 # Update bytes_written from current local collection and
452 # report initial progress.
455 self._upload_started = True # Used by the update thread to start checkpointing
457 except (SystemExit, Exception) as e:
458 self._checkpoint_before_quit = False
459 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
460 # Note: We're expecting SystemExit instead of KeyboardInterrupt because
461 # we have a custom signal handler in place that raises SystemExit with
462 # the catched signal's code.
463 if not isinstance(e, SystemExit) or e.code != -2:
464 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
468 # Stop the thread before doing anything else
469 self._stop_checkpointer.set()
470 self._checkpointer.join()
471 if self._checkpoint_before_quit:
472 # Commit all pending blocks & one last _update()
473 self._local_collection.manifest_text()
474 self._update(final=True)
476 self.save_collection()
478 self._cache_file.close()
480 def save_collection(self):
482 # Check if files should be updated on the remote collection.
483 for fp in self._file_paths:
484 remote_file = self._remote_collection.find(fp)
486 # File don't exist on remote collection, copy it.
487 self._remote_collection.copy(fp, fp, self._local_collection)
488 elif remote_file != self._local_collection.find(fp):
489 # A different file exist on remote collection, overwrite it.
490 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
492 # The file already exist on remote collection, skip it.
494 self._remote_collection.save(num_retries=self.num_retries)
496 self._local_collection.save_new(
497 name=self.name, owner_uuid=self.owner_uuid,
498 ensure_unique_name=self.ensure_unique_name,
499 num_retries=self.num_retries)
501 def destroy_cache(self):
504 os.unlink(self._cache_filename)
505 except OSError as error:
506 # That's what we wanted anyway.
507 if error.errno != errno.ENOENT:
509 self._cache_file.close()
511 def _collection_size(self, collection):
513 Recursively get the total size of the collection
516 for item in list(collection.values()):
517 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
518 size += self._collection_size(item)
523 def _update_task(self):
525 Periodically called support task. File uploading is
526 asynchronous so we poll status from the collection.
528 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
531 def _update(self, final=False):
533 Update cached manifest text and report progress.
535 if self._upload_started:
536 with self._collection_lock:
537 self.bytes_written = self._collection_size(self._local_collection)
540 manifest = self._local_collection.manifest_text()
542 # Get the manifest text without comitting pending blocks
543 manifest = self._local_collection.manifest_text(strip=False,
547 with self._state_lock:
548 self._state['manifest'] = manifest
552 self.bytes_written = self.bytes_skipped
553 # Call the reporter, if any
554 self.report_progress()
556 def report_progress(self):
557 if self.reporter is not None:
558 self.reporter(self.bytes_written, self.bytes_expected)
560 def _write_stdin(self, filename):
561 output = self._local_collection.open(filename, 'w')
562 self._write(sys.stdin, output)
565 def _check_file(self, source, filename):
566 """Check if this file needs to be uploaded"""
568 should_upload = False
569 new_file_in_cache = False
570 # Record file path for updating the remote collection before exiting
571 self._file_paths.add(filename)
573 with self._state_lock:
574 # If no previous cached data on this file, store it for an eventual
576 if source not in self._state['files']:
577 self._state['files'][source] = {
578 'mtime': os.path.getmtime(source),
579 'size' : os.path.getsize(source)
581 new_file_in_cache = True
582 cached_file_data = self._state['files'][source]
584 # Check if file was already uploaded (at least partially)
585 file_in_local_collection = self._local_collection.find(filename)
587 # If not resuming, upload the full file.
590 # New file detected from last run, upload it.
591 elif new_file_in_cache:
593 # Local file didn't change from last run.
594 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
595 if not file_in_local_collection:
596 # File not uploaded yet, upload it completely
598 elif file_in_local_collection.permission_expired():
599 # Permission token expired, re-upload file. This will change whenever
600 # we have a API for refreshing tokens.
602 self._local_collection.remove(filename)
603 elif cached_file_data['size'] == file_in_local_collection.size():
604 # File already there, skip it.
605 self.bytes_skipped += cached_file_data['size']
606 elif cached_file_data['size'] > file_in_local_collection.size():
607 # File partially uploaded, resume!
608 resume_offset = file_in_local_collection.size()
609 self.bytes_skipped += resume_offset
612 # Inconsistent cache, re-upload the file
614 self._local_collection.remove(filename)
615 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
616 # Local file differs from cached data, re-upload it.
618 if file_in_local_collection:
619 self._local_collection.remove(filename)
623 self._files_to_upload.append((source, resume_offset, filename))
625 def _upload_files(self):
626 for source, resume_offset, filename in self._files_to_upload:
627 with open(source, 'r') as source_fd:
628 with self._state_lock:
629 self._state['files'][source]['mtime'] = os.path.getmtime(source)
630 self._state['files'][source]['size'] = os.path.getsize(source)
631 if resume_offset > 0:
632 # Start upload where we left off
633 output = self._local_collection.open(filename, 'a')
634 source_fd.seek(resume_offset)
637 output = self._local_collection.open(filename, 'w')
638 self._write(source_fd, output)
639 output.close(flush=False)
641 def _write(self, source_fd, output):
643 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
648 def _my_collection(self):
649 return self._remote_collection if self.update else self._local_collection
651 def _setup_state(self, update_collection):
653 Create a new cache file or load a previously existing one.
655 # Load an already existing collection for update
656 if update_collection and re.match(arvados.util.collection_uuid_pattern,
659 self._remote_collection = arvados.collection.Collection(update_collection)
660 except arvados.errors.ApiError as error:
661 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
664 elif update_collection:
665 # Collection locator provided, but unknown format
666 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
669 # Set up cache file name from input paths.
671 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
672 realpaths = sorted(os.path.realpath(path) for path in self.paths)
673 md5.update('\0'.join(realpaths))
675 md5.update(self.filename)
676 cache_filename = md5.hexdigest()
677 cache_filepath = os.path.join(
678 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
681 self._cache_file = open(cache_filepath, 'a+')
683 # --no-resume means start with a empty cache file.
684 self._cache_file = open(cache_filepath, 'w+')
685 self._cache_filename = self._cache_file.name
686 self._lock_file(self._cache_file)
687 self._cache_file.seek(0)
689 with self._state_lock:
692 self._state = json.load(self._cache_file)
693 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
694 # Cache at least partially incomplete, set up new cache
695 self._state = copy.deepcopy(self.EMPTY_STATE)
697 # Cache file empty, set up new cache
698 self._state = copy.deepcopy(self.EMPTY_STATE)
700 # No cache file, set empty state
701 self._state = copy.deepcopy(self.EMPTY_STATE)
702 # Load the previous manifest so we can check if files were modified remotely.
703 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
705 def collection_file_paths(self, col, path_prefix='.'):
706 """Return a list of file paths by recursively go through the entire collection `col`"""
708 for name, item in list(col.items()):
709 if isinstance(item, arvados.arvfile.ArvadosFile):
710 file_paths.append(os.path.join(path_prefix, name))
711 elif isinstance(item, arvados.collection.Subcollection):
712 new_prefix = os.path.join(path_prefix, name)
713 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
716 def _lock_file(self, fileobj):
718 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
720 raise ResumeCacheConflict("{} locked".format(fileobj.name))
722 def _save_state(self):
724 Atomically save current state into cache.
727 with self._state_lock:
728 # We're not using copy.deepcopy() here because it's a lot slower
729 # than json.dumps(), and we're already needing JSON format to be
731 state = json.dumps(self._state)
732 new_cache_fd, new_cache_name = tempfile.mkstemp(
733 dir=os.path.dirname(self._cache_filename))
734 self._lock_file(new_cache_fd)
735 new_cache = os.fdopen(new_cache_fd, 'r+')
736 new_cache.write(state)
739 os.rename(new_cache_name, self._cache_filename)
740 except (IOError, OSError, ResumeCacheConflict) as error:
741 self.logger.error("There was a problem while saving the cache file: {}".format(error))
743 os.unlink(new_cache_name)
744 except NameError: # mkstemp failed.
747 self._cache_file.close()
748 self._cache_file = new_cache
750 def collection_name(self):
751 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
753 def manifest_locator(self):
754 return self._my_collection().manifest_locator()
756 def portable_data_hash(self):
757 pdh = self._my_collection().portable_data_hash()
758 m = self._my_collection().stripped_manifest()
759 local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
761 logger.warning("\n".join([
762 "arv-put: API server provided PDH differs from local manifest.",
763 " This should not happen; showing API server version."]))
766 def manifest_text(self, stream_name=".", strip=False, normalize=False):
767 return self._my_collection().manifest_text(stream_name, strip, normalize)
769 def _datablocks_on_item(self, item):
771 Return a list of datablock locators, recursively navigating
772 through subcollections
774 if isinstance(item, arvados.arvfile.ArvadosFile):
777 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
780 for segment in item.segments():
781 loc = segment.locator
784 elif isinstance(item, arvados.collection.Collection):
785 l = [self._datablocks_on_item(x) for x in list(item.values())]
786 # Fast list flattener method taken from:
787 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
788 return [loc for sublist in l for loc in sublist]
792 def data_locators(self):
793 with self._collection_lock:
794 # Make sure all datablocks are flushed before getting the locators
795 self._my_collection().manifest_text()
796 datablocks = self._datablocks_on_item(self._my_collection())
800 def expected_bytes_for(pathlist):
801 # Walk the given directory trees and stat files, adding up file sizes,
802 # so we can display progress as percent
804 for path in pathlist:
805 if os.path.isdir(path):
806 for filename in arvados.util.listdir_recursive(path):
807 bytesum += os.path.getsize(os.path.join(path, filename))
808 elif not os.path.isfile(path):
811 bytesum += os.path.getsize(path)
814 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
816 def machine_progress(bytes_written, bytes_expected):
817 return _machine_format.format(
818 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
820 def human_progress(bytes_written, bytes_expected):
822 return "\r{}M / {}M {:.1%} ".format(
823 bytes_written >> 20, bytes_expected >> 20,
824 old_div(float(bytes_written), bytes_expected))
826 return "\r{} ".format(bytes_written)
828 def progress_writer(progress_func, outfile=sys.stderr):
829 def write_progress(bytes_written, bytes_expected):
830 outfile.write(progress_func(bytes_written, bytes_expected))
831 return write_progress
833 def exit_signal_handler(sigcode, frame):
836 def desired_project_uuid(api_client, project_uuid, num_retries):
838 query = api_client.users().current()
839 elif arvados.util.user_uuid_pattern.match(project_uuid):
840 query = api_client.users().get(uuid=project_uuid)
841 elif arvados.util.group_uuid_pattern.match(project_uuid):
842 query = api_client.groups().get(uuid=project_uuid)
844 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
845 return query.execute(num_retries=num_retries)['uuid']
847 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
850 logger = logging.getLogger('arvados.arv_put')
851 logger.setLevel(logging.INFO)
852 args = parse_arguments(arguments)
854 if api_client is None:
855 api_client = arvados.api('v1')
857 # Determine the name to use
859 if args.stream or args.raw:
860 logger.error("Cannot use --name with --stream or --raw")
862 elif args.update_collection:
863 logger.error("Cannot use --name with --update-collection")
865 collection_name = args.name
867 collection_name = "Saved at {} by {}@{}".format(
868 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
869 pwd.getpwuid(os.getuid()).pw_name,
870 socket.gethostname())
872 if args.project_uuid and (args.stream or args.raw):
873 logger.error("Cannot use --project-uuid with --stream or --raw")
876 # Determine the parent project
878 project_uuid = desired_project_uuid(api_client, args.project_uuid,
880 except (apiclient_errors.Error, ValueError) as error:
885 reporter = progress_writer(human_progress)
886 elif args.batch_progress:
887 reporter = progress_writer(machine_progress)
891 # If this is used by a human, and there's at least one directory to be
892 # uploaded, the expected bytes calculation can take a moment.
893 if args.progress and any([os.path.isdir(f) for f in args.paths]):
894 logger.info("Calculating upload size, this could take some time...")
895 bytes_expected = expected_bytes_for(args.paths)
898 writer = ArvPutUploadJob(paths = args.paths,
899 resume = args.resume,
900 use_cache = args.use_cache,
901 filename = args.filename,
903 bytes_expected = bytes_expected,
904 num_retries = args.retries,
905 replication_desired = args.replication,
906 put_threads = args.threads,
907 name = collection_name,
908 owner_uuid = project_uuid,
909 ensure_unique_name = True,
910 update_collection = args.update_collection,
912 dry_run=args.dry_run)
913 except ResumeCacheConflict:
914 logger.error("\n".join([
915 "arv-put: Another process is already uploading this data.",
916 " Use --no-cache if this is really what you want."]))
918 except CollectionUpdateError as error:
919 logger.error("\n".join([
920 "arv-put: %s" % str(error)]))
922 except ArvPutUploadIsPending:
923 # Dry run check successful, return proper exit code.
925 except ArvPutUploadNotPending:
926 # No files pending for upload
929 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
931 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
932 for sigcode in CAUGHT_SIGNALS}
934 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
935 logger.warning("\n".join([
936 "arv-put: Resuming previous upload from last checkpoint.",
937 " Use the --no-resume option to start over."]))
940 writer.report_progress()
943 writer.start(save_collection=not(args.stream or args.raw))
944 except arvados.errors.ApiError as error:
945 logger.error("\n".join([
946 "arv-put: %s" % str(error)]))
948 except ArvPutUploadIsPending:
949 # Dry run check successful, return proper exit code.
951 except ArvPutUploadNotPending:
952 # No files pending for upload
955 if args.progress: # Print newline to split stderr from stdout for humans.
960 output = writer.manifest_text(normalize=True)
962 output = writer.manifest_text()
964 output = ','.join(writer.data_locators())
967 if args.update_collection:
968 logger.info("Collection updated: '{}'".format(writer.collection_name()))
970 logger.info("Collection saved as '{}'".format(writer.collection_name()))
971 if args.portable_data_hash:
972 output = writer.portable_data_hash()
974 output = writer.manifest_locator()
975 except apiclient_errors.Error as error:
977 "arv-put: Error creating Collection on project: {}.".format(
981 # Print the locator (uuid) of the new collection.
986 if not output.endswith('\n'):
989 for sigcode, orig_handler in list(orig_signal_handlers.items()):
990 signal.signal(sigcode, orig_handler)
999 if __name__ == '__main__':