1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
31 from apiclient import errors as apiclient_errors
32 from arvados._version import __version__
34 import arvados.commands._util as arv_cmd
36 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
39 upload_opts = argparse.ArgumentParser(add_help=False)
41 upload_opts.add_argument('--version', action='version',
42 version="%s %s" % (sys.argv[0], __version__),
43 help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
51 _group = upload_opts.add_mutually_exclusive_group()
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54 default=-1, help=argparse.SUPPRESS)
56 _group.add_argument('--normalize', action='store_true',
58 Normalize the manifest by re-ordering files and streams after writing
62 _group.add_argument('--dry-run', action='store_true', default=False,
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
68 _group = upload_opts.add_mutually_exclusive_group()
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
75 _group.add_argument('--stream', action='store_true',
77 Store the file content and display the resulting manifest on
78 stdout. Do not write the manifest to Keep or save a Collection object
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89 Synonym for --manifest.
92 _group.add_argument('--manifest', action='store_true',
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
104 _group.add_argument('--raw', action='store_true',
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112 dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118 dest='filename', help="""
119 Synonym for --filename.
122 upload_opts.add_argument('--filename', type=str, default=None,
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
143 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145 Set the number of upload threads to be used. Take into account that
146 using lots of threads will increase the RAM requirements. Default is
148 On high latency installations, using a greater number will improve
152 run_opts = argparse.ArgumentParser(add_help=False)
154 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
155 Store the collection in the specified project, instead of your Home
159 run_opts.add_argument('--name', help="""
160 Save the collection with the specified name.
163 _group = run_opts.add_mutually_exclusive_group()
164 _group.add_argument('--progress', action='store_true',
166 Display human-readable progress on stderr (bytes and, if possible,
167 percentage of total data size). This is the default behavior when
171 _group.add_argument('--no-progress', action='store_true',
173 Do not display human-readable progress on stderr, even if stderr is a
177 _group.add_argument('--batch-progress', action='store_true',
179 Display machine-readable progress on stderr (bytes and, if known,
183 _group = run_opts.add_mutually_exclusive_group()
184 _group.add_argument('--resume', action='store_true', default=True,
186 Continue interrupted uploads from cached state (default).
188 _group.add_argument('--no-resume', action='store_false', dest='resume',
190 Do not continue interrupted uploads from cached state.
193 _group = run_opts.add_mutually_exclusive_group()
194 _group.add_argument('--follow-links', action='store_true', default=True,
195 dest='follow_links', help="""
196 Follow file and directory symlinks (default).
198 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
200 Do not follow file and directory symlinks.
203 _group = run_opts.add_mutually_exclusive_group()
204 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
206 Save upload state in a cache file for resuming (default).
208 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
210 Do not save upload state in a cache file for resuming.
213 arg_parser = argparse.ArgumentParser(
214 description='Copy data from the local filesystem to Keep.',
215 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
217 def parse_arguments(arguments):
218 args = arg_parser.parse_args(arguments)
220 if len(args.paths) == 0:
223 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
225 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
228 --filename argument cannot be used when storing a directory or
232 # Turn on --progress by default if stderr is a tty.
233 if (not (args.batch_progress or args.no_progress)
234 and os.isatty(sys.stderr.fileno())):
237 # Turn off --resume (default) if --no-cache is used.
238 if not args.use_cache:
241 if args.paths == ['-']:
242 if args.update_collection:
244 --update-collection cannot be used when reading from stdin.
247 args.use_cache = False
248 if not args.filename:
249 args.filename = 'stdin'
254 class PathDoesNotExistError(Exception):
258 class CollectionUpdateError(Exception):
262 class ResumeCacheConflict(Exception):
266 class ArvPutArgumentConflict(Exception):
270 class ArvPutUploadIsPending(Exception):
274 class ArvPutUploadNotPending(Exception):
278 class FileUploadList(list):
279 def __init__(self, dry_run=False):
281 self.dry_run = dry_run
283 def append(self, other):
285 raise ArvPutUploadIsPending()
286 super(FileUploadList, self).append(other)
289 class ResumeCache(object):
290 CACHE_DIR = '.cache/arvados/arv-put'
292 def __init__(self, file_spec):
293 self.cache_file = open(file_spec, 'a+')
294 self._lock_file(self.cache_file)
295 self.filename = self.cache_file.name
298 def make_path(cls, args):
300 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
301 realpaths = sorted(os.path.realpath(path) for path in args.paths)
302 md5.update(b'\0'.join([p.encode() for p in realpaths]))
303 if any(os.path.isdir(path) for path in realpaths):
306 md5.update(args.filename.encode())
308 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
311 def _lock_file(self, fileobj):
313 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
315 raise ResumeCacheConflict("{} locked".format(fileobj.name))
318 self.cache_file.seek(0)
319 return json.load(self.cache_file)
321 def check_cache(self, api_client=None, num_retries=0):
326 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
327 locator = state["_finished_streams"][0][1][0]
328 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
329 locator = state["_current_stream_locators"][0]
330 if locator is not None:
331 kc = arvados.keep.KeepClient(api_client=api_client)
332 kc.head(locator, num_retries=num_retries)
333 except Exception as e:
338 def save(self, data):
340 new_cache_fd, new_cache_name = tempfile.mkstemp(
341 dir=os.path.dirname(self.filename))
342 self._lock_file(new_cache_fd)
343 new_cache = os.fdopen(new_cache_fd, 'r+')
344 json.dump(data, new_cache)
345 os.rename(new_cache_name, self.filename)
346 except (IOError, OSError, ResumeCacheConflict) as error:
348 os.unlink(new_cache_name)
349 except NameError: # mkstemp failed.
352 self.cache_file.close()
353 self.cache_file = new_cache
356 self.cache_file.close()
360 os.unlink(self.filename)
361 except OSError as error:
362 if error.errno != errno.ENOENT: # That's what we wanted anyway.
368 self.__init__(self.filename)
371 class ArvPutUploadJob(object):
372 CACHE_DIR = '.cache/arvados/arv-put'
374 'manifest' : None, # Last saved manifest checkpoint
375 'files' : {} # Previous run file list: {path : {size, mtime}}
378 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
379 bytes_expected=None, name=None, owner_uuid=None,
380 ensure_unique_name=False, num_retries=None,
381 put_threads=None, replication_desired=None,
382 filename=None, update_time=60.0, update_collection=None,
383 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
387 self.use_cache = use_cache
389 self.reporter = reporter
390 self.bytes_expected = bytes_expected
391 self.bytes_written = 0
392 self.bytes_skipped = 0
394 self.owner_uuid = owner_uuid
395 self.ensure_unique_name = ensure_unique_name
396 self.num_retries = num_retries
397 self.replication_desired = replication_desired
398 self.put_threads = put_threads
399 self.filename = filename
400 self._state_lock = threading.Lock()
401 self._state = None # Previous run state (file list & manifest)
402 self._current_files = [] # Current run file list
403 self._cache_file = None
404 self._collection_lock = threading.Lock()
405 self._remote_collection = None # Collection being updated (if asked)
406 self._local_collection = None # Collection from previous run manifest
407 self._file_paths = set() # Files to be updated in remote collection
408 self._stop_checkpointer = threading.Event()
409 self._checkpointer = threading.Thread(target=self._update_task)
410 self._checkpointer.daemon = True
411 self._update_task_time = update_time # How many seconds wait between update runs
412 self._files_to_upload = FileUploadList(dry_run=dry_run)
413 self._upload_started = False
415 self.dry_run = dry_run
416 self._checkpoint_before_quit = True
417 self.follow_links = follow_links
419 if not self.use_cache and self.resume:
420 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
422 # Check for obvious dry-run responses
423 if self.dry_run and (not self.use_cache or not self.resume):
424 raise ArvPutUploadIsPending()
426 # Load cached data if any and if needed
427 self._setup_state(update_collection)
429 def start(self, save_collection):
431 Start supporting thread & file uploading
434 self._checkpointer.start()
436 for path in self.paths:
437 # Test for stdin first, in case some file named '-' exist
440 raise ArvPutUploadIsPending()
441 self._write_stdin(self.filename or 'stdin')
442 elif not os.path.exists(path):
443 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
444 elif os.path.isdir(path):
445 # Use absolute paths on cache index so CWD doesn't interfere
446 # with the caching logic.
448 path = os.path.abspath(path)
449 if orig_path[-1:] == os.sep:
450 # When passing a directory reference with a trailing slash,
451 # its contents should be uploaded directly to the collection's root.
454 # When passing a directory reference with no trailing slash,
455 # upload the directory to the collection's root.
456 prefixdir = os.path.dirname(path)
458 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
459 # Make os.walk()'s dir traversing order deterministic
463 self._check_file(os.path.join(root, f),
464 os.path.join(root[len(prefixdir):], f))
466 self._check_file(os.path.abspath(path),
467 self.filename or os.path.basename(path))
468 # If dry-mode is on, and got up to this point, then we should notify that
469 # there aren't any file to upload.
471 raise ArvPutUploadNotPending()
472 # Remove local_collection's files that don't exist locally anymore, so the
473 # bytes_written count is correct.
474 for f in self.collection_file_paths(self._local_collection,
476 if f != 'stdin' and f != self.filename and not f in self._file_paths:
477 self._local_collection.remove(f)
478 # Update bytes_written from current local collection and
479 # report initial progress.
482 self._upload_started = True # Used by the update thread to start checkpointing
484 except (SystemExit, Exception) as e:
485 self._checkpoint_before_quit = False
486 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
487 # Note: We're expecting SystemExit instead of
488 # KeyboardInterrupt because we have a custom signal
489 # handler in place that raises SystemExit with the catched
491 if isinstance(e, PathDoesNotExistError):
492 # We aren't interested in the traceback for this case
494 elif not isinstance(e, SystemExit) or e.code != -2:
495 self.logger.warning("Abnormal termination:\n{}".format(
496 traceback.format_exc()))
500 # Stop the thread before doing anything else
501 self._stop_checkpointer.set()
502 self._checkpointer.join()
503 if self._checkpoint_before_quit:
504 # Commit all pending blocks & one last _update()
505 self._local_collection.manifest_text()
506 self._update(final=True)
508 self.save_collection()
510 self._cache_file.close()
512 def save_collection(self):
514 # Check if files should be updated on the remote collection.
515 for fp in self._file_paths:
516 remote_file = self._remote_collection.find(fp)
518 # File don't exist on remote collection, copy it.
519 self._remote_collection.copy(fp, fp, self._local_collection)
520 elif remote_file != self._local_collection.find(fp):
521 # A different file exist on remote collection, overwrite it.
522 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
524 # The file already exist on remote collection, skip it.
526 self._remote_collection.save(num_retries=self.num_retries)
528 self._local_collection.save_new(
529 name=self.name, owner_uuid=self.owner_uuid,
530 ensure_unique_name=self.ensure_unique_name,
531 num_retries=self.num_retries)
533 def destroy_cache(self):
536 os.unlink(self._cache_filename)
537 except OSError as error:
538 # That's what we wanted anyway.
539 if error.errno != errno.ENOENT:
541 self._cache_file.close()
543 def _collection_size(self, collection):
545 Recursively get the total size of the collection
548 for item in listvalues(collection):
549 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
550 size += self._collection_size(item)
555 def _update_task(self):
557 Periodically called support task. File uploading is
558 asynchronous so we poll status from the collection.
560 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
563 def _update(self, final=False):
565 Update cached manifest text and report progress.
567 if self._upload_started:
568 with self._collection_lock:
569 self.bytes_written = self._collection_size(self._local_collection)
572 manifest = self._local_collection.manifest_text()
574 # Get the manifest text without comitting pending blocks
575 manifest = self._local_collection.manifest_text(strip=False,
579 with self._state_lock:
580 self._state['manifest'] = manifest
584 except Exception as e:
585 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
587 self.bytes_written = self.bytes_skipped
588 # Call the reporter, if any
589 self.report_progress()
591 def report_progress(self):
592 if self.reporter is not None:
593 self.reporter(self.bytes_written, self.bytes_expected)
595 def _write_stdin(self, filename):
596 output = self._local_collection.open(filename, 'wb')
597 self._write(sys.stdin, output)
600 def _check_file(self, source, filename):
602 Check if this file needs to be uploaded
604 # Ignore symlinks when requested
605 if (not self.follow_links) and os.path.islink(source):
608 should_upload = False
609 new_file_in_cache = False
610 # Record file path for updating the remote collection before exiting
611 self._file_paths.add(filename)
613 with self._state_lock:
614 # If no previous cached data on this file, store it for an eventual
616 if source not in self._state['files']:
617 self._state['files'][source] = {
618 'mtime': os.path.getmtime(source),
619 'size' : os.path.getsize(source)
621 new_file_in_cache = True
622 cached_file_data = self._state['files'][source]
624 # Check if file was already uploaded (at least partially)
625 file_in_local_collection = self._local_collection.find(filename)
627 # If not resuming, upload the full file.
630 # New file detected from last run, upload it.
631 elif new_file_in_cache:
633 # Local file didn't change from last run.
634 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
635 if not file_in_local_collection:
636 # File not uploaded yet, upload it completely
638 elif file_in_local_collection.permission_expired():
639 # Permission token expired, re-upload file. This will change whenever
640 # we have a API for refreshing tokens.
642 self._local_collection.remove(filename)
643 elif cached_file_data['size'] == file_in_local_collection.size():
644 # File already there, skip it.
645 self.bytes_skipped += cached_file_data['size']
646 elif cached_file_data['size'] > file_in_local_collection.size():
647 # File partially uploaded, resume!
648 resume_offset = file_in_local_collection.size()
649 self.bytes_skipped += resume_offset
652 # Inconsistent cache, re-upload the file
654 self._local_collection.remove(filename)
655 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
656 # Local file differs from cached data, re-upload it.
658 if file_in_local_collection:
659 self._local_collection.remove(filename)
663 self._files_to_upload.append((source, resume_offset, filename))
665 def _upload_files(self):
666 for source, resume_offset, filename in self._files_to_upload:
667 with open(source, 'rb') as source_fd:
668 with self._state_lock:
669 self._state['files'][source]['mtime'] = os.path.getmtime(source)
670 self._state['files'][source]['size'] = os.path.getsize(source)
671 if resume_offset > 0:
672 # Start upload where we left off
673 output = self._local_collection.open(filename, 'ab')
674 source_fd.seek(resume_offset)
677 output = self._local_collection.open(filename, 'wb')
678 self._write(source_fd, output)
679 output.close(flush=False)
681 def _write(self, source_fd, output):
683 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
688 def _my_collection(self):
689 return self._remote_collection if self.update else self._local_collection
691 def _setup_state(self, update_collection):
693 Create a new cache file or load a previously existing one.
695 # Load an already existing collection for update
696 if update_collection and re.match(arvados.util.collection_uuid_pattern,
699 self._remote_collection = arvados.collection.Collection(update_collection)
700 except arvados.errors.ApiError as error:
701 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
704 elif update_collection:
705 # Collection locator provided, but unknown format
706 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
709 # Set up cache file name from input paths.
711 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
712 realpaths = sorted(os.path.realpath(path) for path in self.paths)
713 md5.update(b'\0'.join([p.encode() for p in realpaths]))
715 md5.update(self.filename.encode())
716 cache_filename = md5.hexdigest()
717 cache_filepath = os.path.join(
718 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
720 if self.resume and os.path.exists(cache_filepath):
721 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
722 self._cache_file = open(cache_filepath, 'a+')
724 # --no-resume means start with a empty cache file.
725 self.logger.info("Creating new cache file at {}".format(cache_filepath))
726 self._cache_file = open(cache_filepath, 'w+')
727 self._cache_filename = self._cache_file.name
728 self._lock_file(self._cache_file)
729 self._cache_file.seek(0)
731 with self._state_lock:
734 self._state = json.load(self._cache_file)
735 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
736 # Cache at least partially incomplete, set up new cache
737 self._state = copy.deepcopy(self.EMPTY_STATE)
739 # Cache file empty, set up new cache
740 self._state = copy.deepcopy(self.EMPTY_STATE)
742 self.logger.info("No cache usage requested for this run.")
743 # No cache file, set empty state
744 self._state = copy.deepcopy(self.EMPTY_STATE)
745 # Load the previous manifest so we can check if files were modified remotely.
746 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
748 def collection_file_paths(self, col, path_prefix='.'):
749 """Return a list of file paths by recursively go through the entire collection `col`"""
751 for name, item in listitems(col):
752 if isinstance(item, arvados.arvfile.ArvadosFile):
753 file_paths.append(os.path.join(path_prefix, name))
754 elif isinstance(item, arvados.collection.Subcollection):
755 new_prefix = os.path.join(path_prefix, name)
756 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
759 def _lock_file(self, fileobj):
761 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
763 raise ResumeCacheConflict("{} locked".format(fileobj.name))
765 def _save_state(self):
767 Atomically save current state into cache.
769 with self._state_lock:
770 # We're not using copy.deepcopy() here because it's a lot slower
771 # than json.dumps(), and we're already needing JSON format to be
773 state = json.dumps(self._state)
775 new_cache = tempfile.NamedTemporaryFile(
777 dir=os.path.dirname(self._cache_filename), delete=False)
778 self._lock_file(new_cache)
779 new_cache.write(state)
782 os.rename(new_cache.name, self._cache_filename)
783 except (IOError, OSError, ResumeCacheConflict) as error:
784 self.logger.error("There was a problem while saving the cache file: {}".format(error))
786 os.unlink(new_cache_name)
787 except NameError: # mkstemp failed.
790 self._cache_file.close()
791 self._cache_file = new_cache
793 def collection_name(self):
794 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
796 def manifest_locator(self):
797 return self._my_collection().manifest_locator()
799 def portable_data_hash(self):
800 pdh = self._my_collection().portable_data_hash()
801 m = self._my_collection().stripped_manifest().encode()
802 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
804 logger.warning("\n".join([
805 "arv-put: API server provided PDH differs from local manifest.",
806 " This should not happen; showing API server version."]))
809 def manifest_text(self, stream_name=".", strip=False, normalize=False):
810 return self._my_collection().manifest_text(stream_name, strip, normalize)
812 def _datablocks_on_item(self, item):
814 Return a list of datablock locators, recursively navigating
815 through subcollections
817 if isinstance(item, arvados.arvfile.ArvadosFile):
820 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
823 for segment in item.segments():
824 loc = segment.locator
827 elif isinstance(item, arvados.collection.Collection):
828 l = [self._datablocks_on_item(x) for x in listvalues(item)]
829 # Fast list flattener method taken from:
830 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
831 return [loc for sublist in l for loc in sublist]
835 def data_locators(self):
836 with self._collection_lock:
837 # Make sure all datablocks are flushed before getting the locators
838 self._my_collection().manifest_text()
839 datablocks = self._datablocks_on_item(self._my_collection())
843 def expected_bytes_for(pathlist, follow_links=True):
844 # Walk the given directory trees and stat files, adding up file sizes,
845 # so we can display progress as percent
847 for path in pathlist:
848 if os.path.isdir(path):
849 for root, dirs, files in os.walk(path, followlinks=follow_links):
852 filepath = os.path.join(root, f)
853 # Ignore symlinked files when requested
854 if (not follow_links) and os.path.islink(filepath):
856 bytesum += os.path.getsize(filepath)
857 elif not os.path.isfile(path):
860 bytesum += os.path.getsize(path)
863 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
865 def machine_progress(bytes_written, bytes_expected):
866 return _machine_format.format(
867 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
869 def human_progress(bytes_written, bytes_expected):
871 return "\r{}M / {}M {:.1%} ".format(
872 bytes_written >> 20, bytes_expected >> 20,
873 float(bytes_written) / bytes_expected)
875 return "\r{} ".format(bytes_written)
877 def progress_writer(progress_func, outfile=sys.stderr):
878 def write_progress(bytes_written, bytes_expected):
879 outfile.write(progress_func(bytes_written, bytes_expected))
880 return write_progress
882 def exit_signal_handler(sigcode, frame):
885 def desired_project_uuid(api_client, project_uuid, num_retries):
887 query = api_client.users().current()
888 elif arvados.util.user_uuid_pattern.match(project_uuid):
889 query = api_client.users().get(uuid=project_uuid)
890 elif arvados.util.group_uuid_pattern.match(project_uuid):
891 query = api_client.groups().get(uuid=project_uuid)
893 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
894 return query.execute(num_retries=num_retries)['uuid']
896 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
899 logger = logging.getLogger('arvados.arv_put')
900 logger.setLevel(logging.INFO)
901 args = parse_arguments(arguments)
903 if api_client is None:
904 api_client = arvados.api('v1')
906 # Determine the name to use
908 if args.stream or args.raw:
909 logger.error("Cannot use --name with --stream or --raw")
911 elif args.update_collection:
912 logger.error("Cannot use --name with --update-collection")
914 collection_name = args.name
916 collection_name = "Saved at {} by {}@{}".format(
917 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
918 pwd.getpwuid(os.getuid()).pw_name,
919 socket.gethostname())
921 if args.project_uuid and (args.stream or args.raw):
922 logger.error("Cannot use --project-uuid with --stream or --raw")
925 # Determine the parent project
927 project_uuid = desired_project_uuid(api_client, args.project_uuid,
929 except (apiclient_errors.Error, ValueError) as error:
934 reporter = progress_writer(human_progress)
935 elif args.batch_progress:
936 reporter = progress_writer(machine_progress)
940 # If this is used by a human, and there's at least one directory to be
941 # uploaded, the expected bytes calculation can take a moment.
942 if args.progress and any([os.path.isdir(f) for f in args.paths]):
943 logger.info("Calculating upload size, this could take some time...")
944 bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
947 writer = ArvPutUploadJob(paths = args.paths,
948 resume = args.resume,
949 use_cache = args.use_cache,
950 filename = args.filename,
952 bytes_expected = bytes_expected,
953 num_retries = args.retries,
954 replication_desired = args.replication,
955 put_threads = args.threads,
956 name = collection_name,
957 owner_uuid = project_uuid,
958 ensure_unique_name = True,
959 update_collection = args.update_collection,
961 dry_run=args.dry_run,
962 follow_links=args.follow_links)
963 except ResumeCacheConflict:
964 logger.error("\n".join([
965 "arv-put: Another process is already uploading this data.",
966 " Use --no-cache if this is really what you want."]))
968 except CollectionUpdateError as error:
969 logger.error("\n".join([
970 "arv-put: %s" % str(error)]))
972 except ArvPutUploadIsPending:
973 # Dry run check successful, return proper exit code.
975 except ArvPutUploadNotPending:
976 # No files pending for upload
979 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
981 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
982 for sigcode in CAUGHT_SIGNALS}
984 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
985 logger.warning("\n".join([
986 "arv-put: Resuming previous upload from last checkpoint.",
987 " Use the --no-resume option to start over."]))
990 writer.report_progress()
993 writer.start(save_collection=not(args.stream or args.raw))
994 except arvados.errors.ApiError as error:
995 logger.error("\n".join([
996 "arv-put: %s" % str(error)]))
998 except ArvPutUploadIsPending:
999 # Dry run check successful, return proper exit code.
1001 except ArvPutUploadNotPending:
1002 # No files pending for upload
1004 except PathDoesNotExistError as error:
1005 logger.error("\n".join([
1006 "arv-put: %s" % str(error)]))
1009 if args.progress: # Print newline to split stderr from stdout for humans.
1014 output = writer.manifest_text(normalize=True)
1016 output = writer.manifest_text()
1018 output = ','.join(writer.data_locators())
1021 if args.update_collection:
1022 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1024 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1025 if args.portable_data_hash:
1026 output = writer.portable_data_hash()
1028 output = writer.manifest_locator()
1029 except apiclient_errors.Error as error:
1031 "arv-put: Error creating Collection on project: {}.".format(
1035 # Print the locator (uuid) of the new collection.
1037 status = status or 1
1039 stdout.write(output)
1040 if not output.endswith('\n'):
1043 for sigcode, orig_handler in listitems(orig_signal_handlers):
1044 signal.signal(sigcode, orig_handler)
1053 if __name__ == '__main__':