4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. Default: read from standard input.
46 _group = upload_opts.add_mutually_exclusive_group()
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49 default=-1, help=argparse.SUPPRESS)
51 _group.add_argument('--normalize', action='store_true',
53 Normalize the manifest by re-ordering files and streams after writing
57 _group.add_argument('--dry-run', action='store_true', default=False,
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
63 _group = upload_opts.add_mutually_exclusive_group()
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
70 _group.add_argument('--stream', action='store_true',
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
79 Synonym for --manifest.
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--manifest', action='store_true',
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
99 _group.add_argument('--raw', action='store_true',
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107 dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113 dest='filename', help="""
114 Synonym for --filename.
117 upload_opts.add_argument('--filename', type=str, default=None,
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
143 On high latency installations, using a greater number will improve
147 run_opts = argparse.ArgumentParser(add_help=False)
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
166 _group.add_argument('--no-progress', action='store_true',
168 Do not display human-readable progress on stderr, even if stderr is a
172 _group.add_argument('--batch-progress', action='store_true',
174 Display machine-readable progress on stderr (bytes and, if known,
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
181 Continue interrupted uploads from cached state (default).
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
185 Do not continue interrupted uploads from cached state.
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
191 Save upload state in a cache file for resuming (default).
193 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
195 Do not save upload state in a cache file for resuming.
198 arg_parser = argparse.ArgumentParser(
199 description='Copy data from the local filesystem to Keep.',
200 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
202 def parse_arguments(arguments):
203 args = arg_parser.parse_args(arguments)
205 if len(args.paths) == 0:
208 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
210 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
213 --filename argument cannot be used when storing a directory or
217 # Turn on --progress by default if stderr is a tty.
218 if (not (args.batch_progress or args.no_progress)
219 and os.isatty(sys.stderr.fileno())):
222 # Turn off --resume (default) if --no-cache is used.
223 if not args.use_cache:
226 if args.paths == ['-']:
227 if args.update_collection:
229 --update-collection cannot be used when reading from stdin.
232 args.use_cache = False
233 if not args.filename:
234 args.filename = 'stdin'
239 class CollectionUpdateError(Exception):
243 class ResumeCacheConflict(Exception):
247 class ArvPutArgumentConflict(Exception):
251 class ArvPutUploadIsPending(Exception):
255 class ArvPutUploadNotPending(Exception):
259 class FileUploadList(list):
260 def __init__(self, dry_run=False):
262 self.dry_run = dry_run
264 def append(self, other):
266 raise ArvPutUploadIsPending()
267 super(FileUploadList, self).append(other)
270 class ResumeCache(object):
271 CACHE_DIR = '.cache/arvados/arv-put'
273 def __init__(self, file_spec):
274 self.cache_file = open(file_spec, 'a+')
275 self._lock_file(self.cache_file)
276 self.filename = self.cache_file.name
279 def make_path(cls, args):
281 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
282 realpaths = sorted(os.path.realpath(path) for path in args.paths)
283 md5.update('\0'.join(realpaths))
284 if any(os.path.isdir(path) for path in realpaths):
287 md5.update(args.filename)
289 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
292 def _lock_file(self, fileobj):
294 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
296 raise ResumeCacheConflict("{} locked".format(fileobj.name))
299 self.cache_file.seek(0)
300 return json.load(self.cache_file)
302 def check_cache(self, api_client=None, num_retries=0):
307 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
308 locator = state["_finished_streams"][0][1][0]
309 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
310 locator = state["_current_stream_locators"][0]
311 if locator is not None:
312 kc = arvados.keep.KeepClient(api_client=api_client)
313 kc.head(locator, num_retries=num_retries)
314 except Exception as e:
319 def save(self, data):
321 new_cache_fd, new_cache_name = tempfile.mkstemp(
322 dir=os.path.dirname(self.filename))
323 self._lock_file(new_cache_fd)
324 new_cache = os.fdopen(new_cache_fd, 'r+')
325 json.dump(data, new_cache)
326 os.rename(new_cache_name, self.filename)
327 except (IOError, OSError, ResumeCacheConflict) as error:
329 os.unlink(new_cache_name)
330 except NameError: # mkstemp failed.
333 self.cache_file.close()
334 self.cache_file = new_cache
337 self.cache_file.close()
341 os.unlink(self.filename)
342 except OSError as error:
343 if error.errno != errno.ENOENT: # That's what we wanted anyway.
349 self.__init__(self.filename)
352 class ArvPutUploadJob(object):
353 CACHE_DIR = '.cache/arvados/arv-put'
355 'manifest' : None, # Last saved manifest checkpoint
356 'files' : {} # Previous run file list: {path : {size, mtime}}
359 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
360 bytes_expected=None, name=None, owner_uuid=None,
361 ensure_unique_name=False, num_retries=None,
362 put_threads=None, replication_desired=None,
363 filename=None, update_time=60.0, update_collection=None,
364 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
367 self.use_cache = use_cache
369 self.reporter = reporter
370 self.bytes_expected = bytes_expected
371 self.bytes_written = 0
372 self.bytes_skipped = 0
374 self.owner_uuid = owner_uuid
375 self.ensure_unique_name = ensure_unique_name
376 self.num_retries = num_retries
377 self.replication_desired = replication_desired
378 self.put_threads = put_threads
379 self.filename = filename
380 self._state_lock = threading.Lock()
381 self._state = None # Previous run state (file list & manifest)
382 self._current_files = [] # Current run file list
383 self._cache_file = None
384 self._collection_lock = threading.Lock()
385 self._remote_collection = None # Collection being updated (if asked)
386 self._local_collection = None # Collection from previous run manifest
387 self._file_paths = set() # Files to be updated in remote collection
388 self._stop_checkpointer = threading.Event()
389 self._checkpointer = threading.Thread(target=self._update_task)
390 self._checkpointer.daemon = True
391 self._update_task_time = update_time # How many seconds wait between update runs
392 self._files_to_upload = FileUploadList(dry_run=dry_run)
393 self._upload_started = False
395 self.dry_run = dry_run
396 self._checkpoint_before_quit = True
398 if not self.use_cache and self.resume:
399 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
401 # Check for obvious dry-run responses
402 if self.dry_run and (not self.use_cache or not self.resume):
403 raise ArvPutUploadIsPending()
405 # Load cached data if any and if needed
406 self._setup_state(update_collection)
408 def start(self, save_collection):
410 Start supporting thread & file uploading
413 self._checkpointer.start()
415 for path in self.paths:
416 # Test for stdin first, in case some file named '-' exist
419 raise ArvPutUploadIsPending()
420 self._write_stdin(self.filename or 'stdin')
421 elif os.path.isdir(path):
422 # Use absolute paths on cache index so CWD doesn't interfere
423 # with the caching logic.
424 prefixdir = path = os.path.abspath(path)
427 for root, dirs, files in os.walk(path):
428 # Make os.walk()'s dir traversing order deterministic
432 self._check_file(os.path.join(root, f),
433 os.path.join(root[len(prefixdir):], f))
435 self._check_file(os.path.abspath(path),
436 self.filename or os.path.basename(path))
437 # If dry-mode is on, and got up to this point, then we should notify that
438 # there aren't any file to upload.
440 raise ArvPutUploadNotPending()
441 # Remove local_collection's files that don't exist locally anymore, so the
442 # bytes_written count is correct.
443 for f in self.collection_file_paths(self._local_collection,
445 if f != 'stdin' and f != self.filename and not f in self._file_paths:
446 self._local_collection.remove(f)
447 # Update bytes_written from current local collection and
448 # report initial progress.
451 self._upload_started = True # Used by the update thread to start checkpointing
453 except (SystemExit, Exception) as e:
454 self._checkpoint_before_quit = False
455 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
456 # Note: We're expecting SystemExit instead of KeyboardInterrupt because
457 # we have a custom signal handler in place that raises SystemExit with
458 # the catched signal's code.
459 if not isinstance(e, SystemExit) or e.code != -2:
460 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
464 # Stop the thread before doing anything else
465 self._stop_checkpointer.set()
466 self._checkpointer.join()
467 if self._checkpoint_before_quit:
468 # Commit all pending blocks & one last _update()
469 self._local_collection.manifest_text()
470 self._update(final=True)
472 self.save_collection()
474 self._cache_file.close()
476 def save_collection(self):
478 # Check if files should be updated on the remote collection.
479 for fp in self._file_paths:
480 remote_file = self._remote_collection.find(fp)
482 # File don't exist on remote collection, copy it.
483 self._remote_collection.copy(fp, fp, self._local_collection)
484 elif remote_file != self._local_collection.find(fp):
485 # A different file exist on remote collection, overwrite it.
486 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
488 # The file already exist on remote collection, skip it.
490 self._remote_collection.save(num_retries=self.num_retries)
492 self._local_collection.save_new(
493 name=self.name, owner_uuid=self.owner_uuid,
494 ensure_unique_name=self.ensure_unique_name,
495 num_retries=self.num_retries)
497 def destroy_cache(self):
500 os.unlink(self._cache_filename)
501 except OSError as error:
502 # That's what we wanted anyway.
503 if error.errno != errno.ENOENT:
505 self._cache_file.close()
507 def _collection_size(self, collection):
509 Recursively get the total size of the collection
512 for item in collection.values():
513 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514 size += self._collection_size(item)
519 def _update_task(self):
521 Periodically called support task. File uploading is
522 asynchronous so we poll status from the collection.
524 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
527 def _update(self, final=False):
529 Update cached manifest text and report progress.
531 if self._upload_started:
532 with self._collection_lock:
533 self.bytes_written = self._collection_size(self._local_collection)
536 manifest = self._local_collection.manifest_text()
538 # Get the manifest text without comitting pending blocks
539 manifest = self._local_collection.manifest_text(strip=False,
543 with self._state_lock:
544 self._state['manifest'] = manifest
548 except Exception as e:
549 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
551 self.bytes_written = self.bytes_skipped
552 # Call the reporter, if any
553 self.report_progress()
555 def report_progress(self):
556 if self.reporter is not None:
557 self.reporter(self.bytes_written, self.bytes_expected)
559 def _write_stdin(self, filename):
560 output = self._local_collection.open(filename, 'w')
561 self._write(sys.stdin, output)
564 def _check_file(self, source, filename):
565 """Check if this file needs to be uploaded"""
567 should_upload = False
568 new_file_in_cache = False
569 # Record file path for updating the remote collection before exiting
570 self._file_paths.add(filename)
572 with self._state_lock:
573 # If no previous cached data on this file, store it for an eventual
575 if source not in self._state['files']:
576 self._state['files'][source] = {
577 'mtime': os.path.getmtime(source),
578 'size' : os.path.getsize(source)
580 new_file_in_cache = True
581 cached_file_data = self._state['files'][source]
583 # Check if file was already uploaded (at least partially)
584 file_in_local_collection = self._local_collection.find(filename)
586 # If not resuming, upload the full file.
589 # New file detected from last run, upload it.
590 elif new_file_in_cache:
592 # Local file didn't change from last run.
593 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
594 if not file_in_local_collection:
595 # File not uploaded yet, upload it completely
597 elif file_in_local_collection.permission_expired():
598 # Permission token expired, re-upload file. This will change whenever
599 # we have a API for refreshing tokens.
601 self._local_collection.remove(filename)
602 elif cached_file_data['size'] == file_in_local_collection.size():
603 # File already there, skip it.
604 self.bytes_skipped += cached_file_data['size']
605 elif cached_file_data['size'] > file_in_local_collection.size():
606 # File partially uploaded, resume!
607 resume_offset = file_in_local_collection.size()
608 self.bytes_skipped += resume_offset
611 # Inconsistent cache, re-upload the file
613 self._local_collection.remove(filename)
614 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
615 # Local file differs from cached data, re-upload it.
617 if file_in_local_collection:
618 self._local_collection.remove(filename)
622 self._files_to_upload.append((source, resume_offset, filename))
624 def _upload_files(self):
625 for source, resume_offset, filename in self._files_to_upload:
626 with open(source, 'r') as source_fd:
627 with self._state_lock:
628 self._state['files'][source]['mtime'] = os.path.getmtime(source)
629 self._state['files'][source]['size'] = os.path.getsize(source)
630 if resume_offset > 0:
631 # Start upload where we left off
632 output = self._local_collection.open(filename, 'a')
633 source_fd.seek(resume_offset)
636 output = self._local_collection.open(filename, 'w')
637 self._write(source_fd, output)
638 output.close(flush=False)
640 def _write(self, source_fd, output):
642 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
647 def _my_collection(self):
648 return self._remote_collection if self.update else self._local_collection
650 def _setup_state(self, update_collection):
652 Create a new cache file or load a previously existing one.
654 # Load an already existing collection for update
655 if update_collection and re.match(arvados.util.collection_uuid_pattern,
658 self._remote_collection = arvados.collection.Collection(update_collection)
659 except arvados.errors.ApiError as error:
660 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
663 elif update_collection:
664 # Collection locator provided, but unknown format
665 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
668 # Set up cache file name from input paths.
670 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
671 realpaths = sorted(os.path.realpath(path) for path in self.paths)
672 md5.update('\0'.join(realpaths))
674 md5.update(self.filename)
675 cache_filename = md5.hexdigest()
676 cache_filepath = os.path.join(
677 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
679 if self.resume and os.path.exists(cache_filepath):
680 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
681 self._cache_file = open(cache_filepath, 'a+')
683 # --no-resume means start with a empty cache file.
684 self.logger.info("Creating new cache file at {}".format(cache_filepath))
685 self._cache_file = open(cache_filepath, 'w+')
686 self._cache_filename = self._cache_file.name
687 self._lock_file(self._cache_file)
688 self._cache_file.seek(0)
690 with self._state_lock:
693 self._state = json.load(self._cache_file)
694 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
695 # Cache at least partially incomplete, set up new cache
696 self._state = copy.deepcopy(self.EMPTY_STATE)
698 # Cache file empty, set up new cache
699 self._state = copy.deepcopy(self.EMPTY_STATE)
701 self.logger.info("No cache usage requested for this run.")
702 # No cache file, set empty state
703 self._state = copy.deepcopy(self.EMPTY_STATE)
704 # Load the previous manifest so we can check if files were modified remotely.
705 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
707 def collection_file_paths(self, col, path_prefix='.'):
708 """Return a list of file paths by recursively go through the entire collection `col`"""
710 for name, item in col.items():
711 if isinstance(item, arvados.arvfile.ArvadosFile):
712 file_paths.append(os.path.join(path_prefix, name))
713 elif isinstance(item, arvados.collection.Subcollection):
714 new_prefix = os.path.join(path_prefix, name)
715 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
718 def _lock_file(self, fileobj):
720 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
722 raise ResumeCacheConflict("{} locked".format(fileobj.name))
724 def _save_state(self):
726 Atomically save current state into cache.
728 with self._state_lock:
729 # We're not using copy.deepcopy() here because it's a lot slower
730 # than json.dumps(), and we're already needing JSON format to be
732 state = json.dumps(self._state)
734 new_cache = tempfile.NamedTemporaryFile(
735 dir=os.path.dirname(self._cache_filename), delete=False)
736 self._lock_file(new_cache)
737 new_cache.write(state)
740 os.rename(new_cache.name, self._cache_filename)
741 except (IOError, OSError, ResumeCacheConflict) as error:
742 self.logger.error("There was a problem while saving the cache file: {}".format(error))
744 os.unlink(new_cache_name)
745 except NameError: # mkstemp failed.
748 self._cache_file.close()
749 self._cache_file = new_cache
751 def collection_name(self):
752 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
754 def manifest_locator(self):
755 return self._my_collection().manifest_locator()
757 def portable_data_hash(self):
758 pdh = self._my_collection().portable_data_hash()
759 m = self._my_collection().stripped_manifest()
760 local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
762 logger.warning("\n".join([
763 "arv-put: API server provided PDH differs from local manifest.",
764 " This should not happen; showing API server version."]))
767 def manifest_text(self, stream_name=".", strip=False, normalize=False):
768 return self._my_collection().manifest_text(stream_name, strip, normalize)
770 def _datablocks_on_item(self, item):
772 Return a list of datablock locators, recursively navigating
773 through subcollections
775 if isinstance(item, arvados.arvfile.ArvadosFile):
778 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
781 for segment in item.segments():
782 loc = segment.locator
785 elif isinstance(item, arvados.collection.Collection):
786 l = [self._datablocks_on_item(x) for x in item.values()]
787 # Fast list flattener method taken from:
788 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
789 return [loc for sublist in l for loc in sublist]
793 def data_locators(self):
794 with self._collection_lock:
795 # Make sure all datablocks are flushed before getting the locators
796 self._my_collection().manifest_text()
797 datablocks = self._datablocks_on_item(self._my_collection())
801 def expected_bytes_for(pathlist):
802 # Walk the given directory trees and stat files, adding up file sizes,
803 # so we can display progress as percent
805 for path in pathlist:
806 if os.path.isdir(path):
807 for filename in arvados.util.listdir_recursive(path):
808 bytesum += os.path.getsize(os.path.join(path, filename))
809 elif not os.path.isfile(path):
812 bytesum += os.path.getsize(path)
815 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
817 def machine_progress(bytes_written, bytes_expected):
818 return _machine_format.format(
819 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
821 def human_progress(bytes_written, bytes_expected):
823 return "\r{}M / {}M {:.1%} ".format(
824 bytes_written >> 20, bytes_expected >> 20,
825 float(bytes_written) / bytes_expected)
827 return "\r{} ".format(bytes_written)
829 def progress_writer(progress_func, outfile=sys.stderr):
830 def write_progress(bytes_written, bytes_expected):
831 outfile.write(progress_func(bytes_written, bytes_expected))
832 return write_progress
834 def exit_signal_handler(sigcode, frame):
837 def desired_project_uuid(api_client, project_uuid, num_retries):
839 query = api_client.users().current()
840 elif arvados.util.user_uuid_pattern.match(project_uuid):
841 query = api_client.users().get(uuid=project_uuid)
842 elif arvados.util.group_uuid_pattern.match(project_uuid):
843 query = api_client.groups().get(uuid=project_uuid)
845 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
846 return query.execute(num_retries=num_retries)['uuid']
848 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
851 logger = logging.getLogger('arvados.arv_put')
852 logger.setLevel(logging.INFO)
853 args = parse_arguments(arguments)
855 if api_client is None:
856 api_client = arvados.api('v1')
858 # Determine the name to use
860 if args.stream or args.raw:
861 logger.error("Cannot use --name with --stream or --raw")
863 elif args.update_collection:
864 logger.error("Cannot use --name with --update-collection")
866 collection_name = args.name
868 collection_name = "Saved at {} by {}@{}".format(
869 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
870 pwd.getpwuid(os.getuid()).pw_name,
871 socket.gethostname())
873 if args.project_uuid and (args.stream or args.raw):
874 logger.error("Cannot use --project-uuid with --stream or --raw")
877 # Determine the parent project
879 project_uuid = desired_project_uuid(api_client, args.project_uuid,
881 except (apiclient_errors.Error, ValueError) as error:
886 reporter = progress_writer(human_progress)
887 elif args.batch_progress:
888 reporter = progress_writer(machine_progress)
892 # If this is used by a human, and there's at least one directory to be
893 # uploaded, the expected bytes calculation can take a moment.
894 if args.progress and any([os.path.isdir(f) for f in args.paths]):
895 logger.info("Calculating upload size, this could take some time...")
896 bytes_expected = expected_bytes_for(args.paths)
899 writer = ArvPutUploadJob(paths = args.paths,
900 resume = args.resume,
901 use_cache = args.use_cache,
902 filename = args.filename,
904 bytes_expected = bytes_expected,
905 num_retries = args.retries,
906 replication_desired = args.replication,
907 put_threads = args.threads,
908 name = collection_name,
909 owner_uuid = project_uuid,
910 ensure_unique_name = True,
911 update_collection = args.update_collection,
913 dry_run=args.dry_run)
914 except ResumeCacheConflict:
915 logger.error("\n".join([
916 "arv-put: Another process is already uploading this data.",
917 " Use --no-cache if this is really what you want."]))
919 except CollectionUpdateError as error:
920 logger.error("\n".join([
921 "arv-put: %s" % str(error)]))
923 except ArvPutUploadIsPending:
924 # Dry run check successful, return proper exit code.
926 except ArvPutUploadNotPending:
927 # No files pending for upload
930 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
932 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
933 for sigcode in CAUGHT_SIGNALS}
935 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
936 logger.warning("\n".join([
937 "arv-put: Resuming previous upload from last checkpoint.",
938 " Use the --no-resume option to start over."]))
941 writer.report_progress()
944 writer.start(save_collection=not(args.stream or args.raw))
945 except arvados.errors.ApiError as error:
946 logger.error("\n".join([
947 "arv-put: %s" % str(error)]))
949 except ArvPutUploadIsPending:
950 # Dry run check successful, return proper exit code.
952 except ArvPutUploadNotPending:
953 # No files pending for upload
956 if args.progress: # Print newline to split stderr from stdout for humans.
961 output = writer.manifest_text(normalize=True)
963 output = writer.manifest_text()
965 output = ','.join(writer.data_locators())
968 if args.update_collection:
969 logger.info("Collection updated: '{}'".format(writer.collection_name()))
971 logger.info("Collection saved as '{}'".format(writer.collection_name()))
972 if args.portable_data_hash:
973 output = writer.portable_data_hash()
975 output = writer.manifest_locator()
976 except apiclient_errors.Error as error:
978 "arv-put: Error creating Collection on project: {}.".format(
982 # Print the locator (uuid) of the new collection.
987 if not output.endswith('\n'):
990 for sigcode, orig_handler in orig_signal_handlers.items():
991 signal.signal(sigcode, orig_handler)
1000 if __name__ == '__main__':