4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. Default: read from standard input.
46 _group = upload_opts.add_mutually_exclusive_group()
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49 default=-1, help=argparse.SUPPRESS)
51 _group.add_argument('--normalize', action='store_true',
53 Normalize the manifest by re-ordering files and streams after writing
57 _group.add_argument('--dry-run', action='store_true', default=False,
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
63 _group = upload_opts.add_mutually_exclusive_group()
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
70 _group.add_argument('--stream', action='store_true',
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
79 Synonym for --manifest.
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--manifest', action='store_true',
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
99 _group.add_argument('--raw', action='store_true',
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107 dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113 dest='filename', help="""
114 Synonym for --filename.
117 upload_opts.add_argument('--filename', type=str, default=None,
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
143 On high latency installations, using a greater number will improve
147 run_opts = argparse.ArgumentParser(add_help=False)
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
166 _group.add_argument('--no-progress', action='store_true',
168 Do not display human-readable progress on stderr, even if stderr is a
172 _group.add_argument('--batch-progress', action='store_true',
174 Display machine-readable progress on stderr (bytes and, if known,
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
181 Continue interrupted uploads from cached state (default).
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
185 Do not continue interrupted uploads from cached state.
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--follow-links', action='store_true', default=True,
190 dest='follow_links', help="""
191 Traverse directory symlinks (default).
192 Multiple symlinks pointing to the same directory will only be followed once.
194 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
196 Do not traverse directory symlinks.
199 _group = run_opts.add_mutually_exclusive_group()
200 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
202 Save upload state in a cache file for resuming (default).
204 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
206 Do not save upload state in a cache file for resuming.
209 arg_parser = argparse.ArgumentParser(
210 description='Copy data from the local filesystem to Keep.',
211 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
213 def parse_arguments(arguments):
214 args = arg_parser.parse_args(arguments)
216 if len(args.paths) == 0:
219 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
221 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
224 --filename argument cannot be used when storing a directory or
228 # Turn on --progress by default if stderr is a tty.
229 if (not (args.batch_progress or args.no_progress)
230 and os.isatty(sys.stderr.fileno())):
233 # Turn off --resume (default) if --no-cache is used.
234 if not args.use_cache:
237 if args.paths == ['-']:
238 if args.update_collection:
240 --update-collection cannot be used when reading from stdin.
243 args.use_cache = False
244 if not args.filename:
245 args.filename = 'stdin'
250 class CollectionUpdateError(Exception):
254 class ResumeCacheConflict(Exception):
258 class ArvPutArgumentConflict(Exception):
262 class ArvPutUploadIsPending(Exception):
266 class ArvPutUploadNotPending(Exception):
270 class FileUploadList(list):
271 def __init__(self, dry_run=False):
273 self.dry_run = dry_run
275 def append(self, other):
277 raise ArvPutUploadIsPending()
278 super(FileUploadList, self).append(other)
281 class ResumeCache(object):
282 CACHE_DIR = '.cache/arvados/arv-put'
284 def __init__(self, file_spec):
285 self.cache_file = open(file_spec, 'a+')
286 self._lock_file(self.cache_file)
287 self.filename = self.cache_file.name
290 def make_path(cls, args):
292 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
293 realpaths = sorted(os.path.realpath(path) for path in args.paths)
294 md5.update('\0'.join(realpaths))
295 if any(os.path.isdir(path) for path in realpaths):
298 md5.update(args.filename)
300 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
303 def _lock_file(self, fileobj):
305 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
307 raise ResumeCacheConflict("{} locked".format(fileobj.name))
310 self.cache_file.seek(0)
311 return json.load(self.cache_file)
313 def check_cache(self, api_client=None, num_retries=0):
318 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
319 locator = state["_finished_streams"][0][1][0]
320 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
321 locator = state["_current_stream_locators"][0]
322 if locator is not None:
323 kc = arvados.keep.KeepClient(api_client=api_client)
324 kc.head(locator, num_retries=num_retries)
325 except Exception as e:
330 def save(self, data):
332 new_cache_fd, new_cache_name = tempfile.mkstemp(
333 dir=os.path.dirname(self.filename))
334 self._lock_file(new_cache_fd)
335 new_cache = os.fdopen(new_cache_fd, 'r+')
336 json.dump(data, new_cache)
337 os.rename(new_cache_name, self.filename)
338 except (IOError, OSError, ResumeCacheConflict) as error:
340 os.unlink(new_cache_name)
341 except NameError: # mkstemp failed.
344 self.cache_file.close()
345 self.cache_file = new_cache
348 self.cache_file.close()
352 os.unlink(self.filename)
353 except OSError as error:
354 if error.errno != errno.ENOENT: # That's what we wanted anyway.
360 self.__init__(self.filename)
363 class ArvPutUploadJob(object):
364 CACHE_DIR = '.cache/arvados/arv-put'
366 'manifest' : None, # Last saved manifest checkpoint
367 'files' : {} # Previous run file list: {path : {size, mtime}}
370 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
371 bytes_expected=None, name=None, owner_uuid=None,
372 ensure_unique_name=False, num_retries=None,
373 put_threads=None, replication_desired=None,
374 filename=None, update_time=60.0, update_collection=None,
375 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
379 self.use_cache = use_cache
381 self.reporter = reporter
382 self.bytes_expected = bytes_expected
383 self.bytes_written = 0
384 self.bytes_skipped = 0
386 self.owner_uuid = owner_uuid
387 self.ensure_unique_name = ensure_unique_name
388 self.num_retries = num_retries
389 self.replication_desired = replication_desired
390 self.put_threads = put_threads
391 self.filename = filename
392 self._state_lock = threading.Lock()
393 self._state = None # Previous run state (file list & manifest)
394 self._current_files = [] # Current run file list
395 self._cache_file = None
396 self._collection_lock = threading.Lock()
397 self._remote_collection = None # Collection being updated (if asked)
398 self._local_collection = None # Collection from previous run manifest
399 self._file_paths = set() # Files to be updated in remote collection
400 self._stop_checkpointer = threading.Event()
401 self._checkpointer = threading.Thread(target=self._update_task)
402 self._checkpointer.daemon = True
403 self._update_task_time = update_time # How many seconds wait between update runs
404 self._files_to_upload = FileUploadList(dry_run=dry_run)
405 self._upload_started = False
407 self.dry_run = dry_run
408 self._checkpoint_before_quit = True
409 self.follow_links = follow_links
410 self._traversed_links = set()
412 if not self.use_cache and self.resume:
413 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
415 # Check for obvious dry-run responses
416 if self.dry_run and (not self.use_cache or not self.resume):
417 raise ArvPutUploadIsPending()
419 # Load cached data if any and if needed
420 self._setup_state(update_collection)
422 def _check_traversed_dir_links(self, root, dirs):
424 Remove from the 'dirs' list the already traversed directory symlinks,
425 register the new dir symlinks as traversed.
427 for d in [d for d in dirs if os.path.isdir(os.path.join(root, d)) and
428 os.path.islink(os.path.join(root, d))]:
429 real_dirpath = os.path.realpath(os.path.join(root, d))
430 if real_dirpath in self._traversed_links:
432 self.logger.warning("Skipping '{}' symlink to directory '{}' because it was already uploaded".format(os.path.join(root, d), real_dirpath))
434 self._traversed_links.add(real_dirpath)
437 def start(self, save_collection):
439 Start supporting thread & file uploading
442 self._checkpointer.start()
444 for path in self.paths:
445 # Test for stdin first, in case some file named '-' exist
448 raise ArvPutUploadIsPending()
449 self._write_stdin(self.filename or 'stdin')
450 elif os.path.isdir(path):
451 # Use absolute paths on cache index so CWD doesn't interfere
452 # with the caching logic.
453 prefixdir = path = os.path.abspath(path)
456 # If following symlinks, avoid recursive traversals
457 if self.follow_links and os.path.islink(path):
458 self._traversed_links.add(os.path.realpath(path))
459 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
460 if self.follow_links:
461 dirs = self._check_traversed_dir_links(root, dirs)
462 # Make os.walk()'s dir traversing order deterministic
466 self._check_file(os.path.join(root, f),
467 os.path.join(root[len(prefixdir):], f))
469 self._check_file(os.path.abspath(path),
470 self.filename or os.path.basename(path))
471 # If dry-mode is on, and got up to this point, then we should notify that
472 # there aren't any file to upload.
474 raise ArvPutUploadNotPending()
475 # Remove local_collection's files that don't exist locally anymore, so the
476 # bytes_written count is correct.
477 for f in self.collection_file_paths(self._local_collection,
479 if f != 'stdin' and f != self.filename and not f in self._file_paths:
480 self._local_collection.remove(f)
481 # Update bytes_written from current local collection and
482 # report initial progress.
485 self._upload_started = True # Used by the update thread to start checkpointing
487 except (SystemExit, Exception) as e:
488 self._checkpoint_before_quit = False
489 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
490 # Note: We're expecting SystemExit instead of KeyboardInterrupt because
491 # we have a custom signal handler in place that raises SystemExit with
492 # the catched signal's code.
493 if not isinstance(e, SystemExit) or e.code != -2:
494 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
498 # Stop the thread before doing anything else
499 self._stop_checkpointer.set()
500 self._checkpointer.join()
501 if self._checkpoint_before_quit:
502 # Commit all pending blocks & one last _update()
503 self._local_collection.manifest_text()
504 self._update(final=True)
506 self.save_collection()
508 self._cache_file.close()
510 def save_collection(self):
512 # Check if files should be updated on the remote collection.
513 for fp in self._file_paths:
514 remote_file = self._remote_collection.find(fp)
516 # File don't exist on remote collection, copy it.
517 self._remote_collection.copy(fp, fp, self._local_collection)
518 elif remote_file != self._local_collection.find(fp):
519 # A different file exist on remote collection, overwrite it.
520 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
522 # The file already exist on remote collection, skip it.
524 self._remote_collection.save(num_retries=self.num_retries)
526 self._local_collection.save_new(
527 name=self.name, owner_uuid=self.owner_uuid,
528 ensure_unique_name=self.ensure_unique_name,
529 num_retries=self.num_retries)
531 def destroy_cache(self):
534 os.unlink(self._cache_filename)
535 except OSError as error:
536 # That's what we wanted anyway.
537 if error.errno != errno.ENOENT:
539 self._cache_file.close()
541 def _collection_size(self, collection):
543 Recursively get the total size of the collection
546 for item in collection.values():
547 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
548 size += self._collection_size(item)
553 def _update_task(self):
555 Periodically called support task. File uploading is
556 asynchronous so we poll status from the collection.
558 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
561 def _update(self, final=False):
563 Update cached manifest text and report progress.
565 if self._upload_started:
566 with self._collection_lock:
567 self.bytes_written = self._collection_size(self._local_collection)
570 manifest = self._local_collection.manifest_text()
572 # Get the manifest text without comitting pending blocks
573 manifest = self._local_collection.manifest_text(strip=False,
577 with self._state_lock:
578 self._state['manifest'] = manifest
582 except Exception as e:
583 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
585 self.bytes_written = self.bytes_skipped
586 # Call the reporter, if any
587 self.report_progress()
589 def report_progress(self):
590 if self.reporter is not None:
591 self.reporter(self.bytes_written, self.bytes_expected)
593 def _write_stdin(self, filename):
594 output = self._local_collection.open(filename, 'w')
595 self._write(sys.stdin, output)
598 def _check_file(self, source, filename):
599 """Check if this file needs to be uploaded"""
601 should_upload = False
602 new_file_in_cache = False
603 # Record file path for updating the remote collection before exiting
604 self._file_paths.add(filename)
606 with self._state_lock:
607 # If no previous cached data on this file, store it for an eventual
609 if source not in self._state['files']:
610 self._state['files'][source] = {
611 'mtime': os.path.getmtime(source),
612 'size' : os.path.getsize(source)
614 new_file_in_cache = True
615 cached_file_data = self._state['files'][source]
617 # Check if file was already uploaded (at least partially)
618 file_in_local_collection = self._local_collection.find(filename)
620 # If not resuming, upload the full file.
623 # New file detected from last run, upload it.
624 elif new_file_in_cache:
626 # Local file didn't change from last run.
627 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
628 if not file_in_local_collection:
629 # File not uploaded yet, upload it completely
631 elif file_in_local_collection.permission_expired():
632 # Permission token expired, re-upload file. This will change whenever
633 # we have a API for refreshing tokens.
635 self._local_collection.remove(filename)
636 elif cached_file_data['size'] == file_in_local_collection.size():
637 # File already there, skip it.
638 self.bytes_skipped += cached_file_data['size']
639 elif cached_file_data['size'] > file_in_local_collection.size():
640 # File partially uploaded, resume!
641 resume_offset = file_in_local_collection.size()
642 self.bytes_skipped += resume_offset
645 # Inconsistent cache, re-upload the file
647 self._local_collection.remove(filename)
648 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
649 # Local file differs from cached data, re-upload it.
651 if file_in_local_collection:
652 self._local_collection.remove(filename)
656 self._files_to_upload.append((source, resume_offset, filename))
658 def _upload_files(self):
659 for source, resume_offset, filename in self._files_to_upload:
660 with open(source, 'r') as source_fd:
661 with self._state_lock:
662 self._state['files'][source]['mtime'] = os.path.getmtime(source)
663 self._state['files'][source]['size'] = os.path.getsize(source)
664 if resume_offset > 0:
665 # Start upload where we left off
666 output = self._local_collection.open(filename, 'a')
667 source_fd.seek(resume_offset)
670 output = self._local_collection.open(filename, 'w')
671 self._write(source_fd, output)
672 output.close(flush=False)
674 def _write(self, source_fd, output):
676 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
681 def _my_collection(self):
682 return self._remote_collection if self.update else self._local_collection
684 def _setup_state(self, update_collection):
686 Create a new cache file or load a previously existing one.
688 # Load an already existing collection for update
689 if update_collection and re.match(arvados.util.collection_uuid_pattern,
692 self._remote_collection = arvados.collection.Collection(update_collection)
693 except arvados.errors.ApiError as error:
694 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
697 elif update_collection:
698 # Collection locator provided, but unknown format
699 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
702 # Set up cache file name from input paths.
704 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
705 realpaths = sorted(os.path.realpath(path) for path in self.paths)
706 md5.update('\0'.join(realpaths))
708 md5.update(self.filename)
709 cache_filename = md5.hexdigest()
710 cache_filepath = os.path.join(
711 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
713 if self.resume and os.path.exists(cache_filepath):
714 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
715 self._cache_file = open(cache_filepath, 'a+')
717 # --no-resume means start with a empty cache file.
718 self.logger.info("Creating new cache file at {}".format(cache_filepath))
719 self._cache_file = open(cache_filepath, 'w+')
720 self._cache_filename = self._cache_file.name
721 self._lock_file(self._cache_file)
722 self._cache_file.seek(0)
724 with self._state_lock:
727 self._state = json.load(self._cache_file)
728 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
729 # Cache at least partially incomplete, set up new cache
730 self._state = copy.deepcopy(self.EMPTY_STATE)
732 # Cache file empty, set up new cache
733 self._state = copy.deepcopy(self.EMPTY_STATE)
735 self.logger.info("No cache usage requested for this run.")
736 # No cache file, set empty state
737 self._state = copy.deepcopy(self.EMPTY_STATE)
738 # Load the previous manifest so we can check if files were modified remotely.
739 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
741 def collection_file_paths(self, col, path_prefix='.'):
742 """Return a list of file paths by recursively go through the entire collection `col`"""
744 for name, item in col.items():
745 if isinstance(item, arvados.arvfile.ArvadosFile):
746 file_paths.append(os.path.join(path_prefix, name))
747 elif isinstance(item, arvados.collection.Subcollection):
748 new_prefix = os.path.join(path_prefix, name)
749 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
752 def _lock_file(self, fileobj):
754 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
756 raise ResumeCacheConflict("{} locked".format(fileobj.name))
758 def _save_state(self):
760 Atomically save current state into cache.
762 with self._state_lock:
763 # We're not using copy.deepcopy() here because it's a lot slower
764 # than json.dumps(), and we're already needing JSON format to be
766 state = json.dumps(self._state)
768 new_cache = tempfile.NamedTemporaryFile(
769 dir=os.path.dirname(self._cache_filename), delete=False)
770 self._lock_file(new_cache)
771 new_cache.write(state)
774 os.rename(new_cache.name, self._cache_filename)
775 except (IOError, OSError, ResumeCacheConflict) as error:
776 self.logger.error("There was a problem while saving the cache file: {}".format(error))
778 os.unlink(new_cache_name)
779 except NameError: # mkstemp failed.
782 self._cache_file.close()
783 self._cache_file = new_cache
785 def collection_name(self):
786 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
788 def manifest_locator(self):
789 return self._my_collection().manifest_locator()
791 def portable_data_hash(self):
792 pdh = self._my_collection().portable_data_hash()
793 m = self._my_collection().stripped_manifest()
794 local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
796 logger.warning("\n".join([
797 "arv-put: API server provided PDH differs from local manifest.",
798 " This should not happen; showing API server version."]))
801 def manifest_text(self, stream_name=".", strip=False, normalize=False):
802 return self._my_collection().manifest_text(stream_name, strip, normalize)
804 def _datablocks_on_item(self, item):
806 Return a list of datablock locators, recursively navigating
807 through subcollections
809 if isinstance(item, arvados.arvfile.ArvadosFile):
812 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
815 for segment in item.segments():
816 loc = segment.locator
819 elif isinstance(item, arvados.collection.Collection):
820 l = [self._datablocks_on_item(x) for x in item.values()]
821 # Fast list flattener method taken from:
822 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
823 return [loc for sublist in l for loc in sublist]
827 def data_locators(self):
828 with self._collection_lock:
829 # Make sure all datablocks are flushed before getting the locators
830 self._my_collection().manifest_text()
831 datablocks = self._datablocks_on_item(self._my_collection())
835 def expected_bytes_for(pathlist, follow_links=True):
836 # Walk the given directory trees and stat files, adding up file sizes,
837 # so we can display progress as percent
840 for path in pathlist:
841 if os.path.isdir(path):
842 for root, dirs, files in os.walk(path, followlinks=follow_links):
844 # Skip those linked dirs that were visited more than once.
845 for d in [x for x in dirs if os.path.islink(os.path.join(root, x))]:
846 d_realpath = os.path.realpath(os.path.join(root, d))
847 if d_realpath in linked_dirs:
848 # Linked dir already visited, skip it.
851 # Will only visit this dir once
852 linked_dirs.add(d_realpath)
855 bytesum += os.path.getsize(os.path.join(root, f))
856 elif not os.path.isfile(path):
859 bytesum += os.path.getsize(path)
862 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
864 def machine_progress(bytes_written, bytes_expected):
865 return _machine_format.format(
866 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
868 def human_progress(bytes_written, bytes_expected):
870 return "\r{}M / {}M {:.1%} ".format(
871 bytes_written >> 20, bytes_expected >> 20,
872 float(bytes_written) / bytes_expected)
874 return "\r{} ".format(bytes_written)
876 def progress_writer(progress_func, outfile=sys.stderr):
877 def write_progress(bytes_written, bytes_expected):
878 outfile.write(progress_func(bytes_written, bytes_expected))
879 return write_progress
881 def exit_signal_handler(sigcode, frame):
884 def desired_project_uuid(api_client, project_uuid, num_retries):
886 query = api_client.users().current()
887 elif arvados.util.user_uuid_pattern.match(project_uuid):
888 query = api_client.users().get(uuid=project_uuid)
889 elif arvados.util.group_uuid_pattern.match(project_uuid):
890 query = api_client.groups().get(uuid=project_uuid)
892 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
893 return query.execute(num_retries=num_retries)['uuid']
895 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
898 logger = logging.getLogger('arvados.arv_put')
899 logger.setLevel(logging.INFO)
900 args = parse_arguments(arguments)
902 if api_client is None:
903 api_client = arvados.api('v1')
905 # Determine the name to use
907 if args.stream or args.raw:
908 logger.error("Cannot use --name with --stream or --raw")
910 elif args.update_collection:
911 logger.error("Cannot use --name with --update-collection")
913 collection_name = args.name
915 collection_name = "Saved at {} by {}@{}".format(
916 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
917 pwd.getpwuid(os.getuid()).pw_name,
918 socket.gethostname())
920 if args.project_uuid and (args.stream or args.raw):
921 logger.error("Cannot use --project-uuid with --stream or --raw")
924 # Determine the parent project
926 project_uuid = desired_project_uuid(api_client, args.project_uuid,
928 except (apiclient_errors.Error, ValueError) as error:
933 reporter = progress_writer(human_progress)
934 elif args.batch_progress:
935 reporter = progress_writer(machine_progress)
939 # If this is used by a human, and there's at least one directory to be
940 # uploaded, the expected bytes calculation can take a moment.
941 if args.progress and any([os.path.isdir(f) for f in args.paths]):
942 logger.info("Calculating upload size, this could take some time...")
943 bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
946 writer = ArvPutUploadJob(paths = args.paths,
947 resume = args.resume,
948 use_cache = args.use_cache,
949 filename = args.filename,
951 bytes_expected = bytes_expected,
952 num_retries = args.retries,
953 replication_desired = args.replication,
954 put_threads = args.threads,
955 name = collection_name,
956 owner_uuid = project_uuid,
957 ensure_unique_name = True,
958 update_collection = args.update_collection,
960 dry_run=args.dry_run,
961 follow_links=args.follow_links)
962 except ResumeCacheConflict:
963 logger.error("\n".join([
964 "arv-put: Another process is already uploading this data.",
965 " Use --no-cache if this is really what you want."]))
967 except CollectionUpdateError as error:
968 logger.error("\n".join([
969 "arv-put: %s" % str(error)]))
971 except ArvPutUploadIsPending:
972 # Dry run check successful, return proper exit code.
974 except ArvPutUploadNotPending:
975 # No files pending for upload
978 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
980 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
981 for sigcode in CAUGHT_SIGNALS}
983 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
984 logger.warning("\n".join([
985 "arv-put: Resuming previous upload from last checkpoint.",
986 " Use the --no-resume option to start over."]))
989 writer.report_progress()
992 writer.start(save_collection=not(args.stream or args.raw))
993 except arvados.errors.ApiError as error:
994 logger.error("\n".join([
995 "arv-put: %s" % str(error)]))
997 except ArvPutUploadIsPending:
998 # Dry run check successful, return proper exit code.
1000 except ArvPutUploadNotPending:
1001 # No files pending for upload
1004 if args.progress: # Print newline to split stderr from stdout for humans.
1009 output = writer.manifest_text(normalize=True)
1011 output = writer.manifest_text()
1013 output = ','.join(writer.data_locators())
1016 if args.update_collection:
1017 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1019 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1020 if args.portable_data_hash:
1021 output = writer.portable_data_hash()
1023 output = writer.manifest_locator()
1024 except apiclient_errors.Error as error:
1026 "arv-put: Error creating Collection on project: {}.".format(
1030 # Print the locator (uuid) of the new collection.
1032 status = status or 1
1034 stdout.write(output)
1035 if not output.endswith('\n'):
1038 for sigcode, orig_handler in orig_signal_handlers.items():
1039 signal.signal(sigcode, orig_handler)
1048 if __name__ == '__main__':