1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
30 import arvados.commands._util as arv_cmd
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
35 upload_opts = argparse.ArgumentParser(add_help=False)
37 upload_opts.add_argument('--version', action='version',
38 version="%s %s" % (sys.argv[0], __version__),
39 help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42 Local file or directory. Default: read from standard input.
45 _group = upload_opts.add_mutually_exclusive_group()
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48 default=-1, help=argparse.SUPPRESS)
50 _group.add_argument('--normalize', action='store_true',
52 Normalize the manifest by re-ordering files and streams after writing
56 _group.add_argument('--dry-run', action='store_true', default=False,
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
62 _group = upload_opts.add_mutually_exclusive_group()
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
69 _group.add_argument('--stream', action='store_true',
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78 Synonym for --manifest.
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83 Synonym for --manifest.
86 _group.add_argument('--manifest', action='store_true',
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
98 _group.add_argument('--raw', action='store_true',
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106 dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112 dest='filename', help="""
113 Synonym for --filename.
116 upload_opts.add_argument('--filename', type=str, default=None,
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
142 On high latency installations, using a greater number will improve
146 run_opts = argparse.ArgumentParser(add_help=False)
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
165 _group.add_argument('--no-progress', action='store_true',
167 Do not display human-readable progress on stderr, even if stderr is a
171 _group.add_argument('--batch-progress', action='store_true',
173 Display machine-readable progress on stderr (bytes and, if known,
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
180 Continue interrupted uploads from cached state (default).
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
184 Do not continue interrupted uploads from cached state.
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--follow-links', action='store_true', default=True,
189 dest='follow_links', help="""
190 Follow file and directory symlinks (default).
192 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
194 Do not follow file and directory symlinks.
197 _group = run_opts.add_mutually_exclusive_group()
198 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
200 Save upload state in a cache file for resuming (default).
202 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
204 Do not save upload state in a cache file for resuming.
207 arg_parser = argparse.ArgumentParser(
208 description='Copy data from the local filesystem to Keep.',
209 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
211 def parse_arguments(arguments):
212 args = arg_parser.parse_args(arguments)
214 if len(args.paths) == 0:
217 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
219 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
222 --filename argument cannot be used when storing a directory or
226 # Turn on --progress by default if stderr is a tty.
227 if (not (args.batch_progress or args.no_progress)
228 and os.isatty(sys.stderr.fileno())):
231 # Turn off --resume (default) if --no-cache is used.
232 if not args.use_cache:
235 if args.paths == ['-']:
236 if args.update_collection:
238 --update-collection cannot be used when reading from stdin.
241 args.use_cache = False
242 if not args.filename:
243 args.filename = 'stdin'
248 class PathDoesNotExistError(Exception):
252 class CollectionUpdateError(Exception):
256 class ResumeCacheConflict(Exception):
260 class ArvPutArgumentConflict(Exception):
264 class ArvPutUploadIsPending(Exception):
268 class ArvPutUploadNotPending(Exception):
272 class FileUploadList(list):
273 def __init__(self, dry_run=False):
275 self.dry_run = dry_run
277 def append(self, other):
279 raise ArvPutUploadIsPending()
280 super(FileUploadList, self).append(other)
283 class ResumeCache(object):
284 CACHE_DIR = '.cache/arvados/arv-put'
286 def __init__(self, file_spec):
287 self.cache_file = open(file_spec, 'a+')
288 self._lock_file(self.cache_file)
289 self.filename = self.cache_file.name
292 def make_path(cls, args):
294 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
295 realpaths = sorted(os.path.realpath(path) for path in args.paths)
296 md5.update(b'\0'.join([p.encode() for p in realpaths]))
297 if any(os.path.isdir(path) for path in realpaths):
300 md5.update(args.filename.encode())
302 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
305 def _lock_file(self, fileobj):
307 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
309 raise ResumeCacheConflict("{} locked".format(fileobj.name))
312 self.cache_file.seek(0)
313 return json.load(self.cache_file)
315 def check_cache(self, api_client=None, num_retries=0):
320 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
321 locator = state["_finished_streams"][0][1][0]
322 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
323 locator = state["_current_stream_locators"][0]
324 if locator is not None:
325 kc = arvados.keep.KeepClient(api_client=api_client)
326 kc.head(locator, num_retries=num_retries)
327 except Exception as e:
332 def save(self, data):
334 new_cache_fd, new_cache_name = tempfile.mkstemp(
335 dir=os.path.dirname(self.filename))
336 self._lock_file(new_cache_fd)
337 new_cache = os.fdopen(new_cache_fd, 'r+')
338 json.dump(data, new_cache)
339 os.rename(new_cache_name, self.filename)
340 except (IOError, OSError, ResumeCacheConflict) as error:
342 os.unlink(new_cache_name)
343 except NameError: # mkstemp failed.
346 self.cache_file.close()
347 self.cache_file = new_cache
350 self.cache_file.close()
354 os.unlink(self.filename)
355 except OSError as error:
356 if error.errno != errno.ENOENT: # That's what we wanted anyway.
362 self.__init__(self.filename)
365 class ArvPutUploadJob(object):
366 CACHE_DIR = '.cache/arvados/arv-put'
368 'manifest' : None, # Last saved manifest checkpoint
369 'files' : {} # Previous run file list: {path : {size, mtime}}
372 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
373 bytes_expected=None, name=None, owner_uuid=None,
374 ensure_unique_name=False, num_retries=None,
375 put_threads=None, replication_desired=None,
376 filename=None, update_time=60.0, update_collection=None,
377 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
381 self.use_cache = use_cache
383 self.reporter = reporter
384 self.bytes_expected = bytes_expected
385 self.bytes_written = 0
386 self.bytes_skipped = 0
388 self.owner_uuid = owner_uuid
389 self.ensure_unique_name = ensure_unique_name
390 self.num_retries = num_retries
391 self.replication_desired = replication_desired
392 self.put_threads = put_threads
393 self.filename = filename
394 self._state_lock = threading.Lock()
395 self._state = None # Previous run state (file list & manifest)
396 self._current_files = [] # Current run file list
397 self._cache_file = None
398 self._collection_lock = threading.Lock()
399 self._remote_collection = None # Collection being updated (if asked)
400 self._local_collection = None # Collection from previous run manifest
401 self._file_paths = set() # Files to be updated in remote collection
402 self._stop_checkpointer = threading.Event()
403 self._checkpointer = threading.Thread(target=self._update_task)
404 self._checkpointer.daemon = True
405 self._update_task_time = update_time # How many seconds wait between update runs
406 self._files_to_upload = FileUploadList(dry_run=dry_run)
407 self._upload_started = False
409 self.dry_run = dry_run
410 self._checkpoint_before_quit = True
411 self.follow_links = follow_links
413 if not self.use_cache and self.resume:
414 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
416 # Check for obvious dry-run responses
417 if self.dry_run and (not self.use_cache or not self.resume):
418 raise ArvPutUploadIsPending()
420 # Load cached data if any and if needed
421 self._setup_state(update_collection)
423 def start(self, save_collection):
425 Start supporting thread & file uploading
428 self._checkpointer.start()
430 for path in self.paths:
431 # Test for stdin first, in case some file named '-' exist
434 raise ArvPutUploadIsPending()
435 self._write_stdin(self.filename or 'stdin')
436 elif not os.path.exists(path):
437 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
438 elif os.path.isdir(path):
439 # Use absolute paths on cache index so CWD doesn't interfere
440 # with the caching logic.
441 prefixdir = path = os.path.abspath(path)
444 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
445 # Make os.walk()'s dir traversing order deterministic
449 self._check_file(os.path.join(root, f),
450 os.path.join(root[len(prefixdir):], f))
452 self._check_file(os.path.abspath(path),
453 self.filename or os.path.basename(path))
454 # If dry-mode is on, and got up to this point, then we should notify that
455 # there aren't any file to upload.
457 raise ArvPutUploadNotPending()
458 # Remove local_collection's files that don't exist locally anymore, so the
459 # bytes_written count is correct.
460 for f in self.collection_file_paths(self._local_collection,
462 if f != 'stdin' and f != self.filename and not f in self._file_paths:
463 self._local_collection.remove(f)
464 # Update bytes_written from current local collection and
465 # report initial progress.
468 self._upload_started = True # Used by the update thread to start checkpointing
470 except (SystemExit, Exception) as e:
471 self._checkpoint_before_quit = False
472 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
473 # Note: We're expecting SystemExit instead of
474 # KeyboardInterrupt because we have a custom signal
475 # handler in place that raises SystemExit with the catched
477 if isinstance(e, PathDoesNotExistError):
478 # We aren't interested in the traceback for this case
480 elif not isinstance(e, SystemExit) or e.code != -2:
481 self.logger.warning("Abnormal termination:\n{}".format(
482 traceback.format_exc()))
486 # Stop the thread before doing anything else
487 self._stop_checkpointer.set()
488 self._checkpointer.join()
489 if self._checkpoint_before_quit:
490 # Commit all pending blocks & one last _update()
491 self._local_collection.manifest_text()
492 self._update(final=True)
494 self.save_collection()
496 self._cache_file.close()
498 def save_collection(self):
500 # Check if files should be updated on the remote collection.
501 for fp in self._file_paths:
502 remote_file = self._remote_collection.find(fp)
504 # File don't exist on remote collection, copy it.
505 self._remote_collection.copy(fp, fp, self._local_collection)
506 elif remote_file != self._local_collection.find(fp):
507 # A different file exist on remote collection, overwrite it.
508 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
510 # The file already exist on remote collection, skip it.
512 self._remote_collection.save(num_retries=self.num_retries)
514 self._local_collection.save_new(
515 name=self.name, owner_uuid=self.owner_uuid,
516 ensure_unique_name=self.ensure_unique_name,
517 num_retries=self.num_retries)
519 def destroy_cache(self):
522 os.unlink(self._cache_filename)
523 except OSError as error:
524 # That's what we wanted anyway.
525 if error.errno != errno.ENOENT:
527 self._cache_file.close()
529 def _collection_size(self, collection):
531 Recursively get the total size of the collection
534 for item in listvalues(collection):
535 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
536 size += self._collection_size(item)
541 def _update_task(self):
543 Periodically called support task. File uploading is
544 asynchronous so we poll status from the collection.
546 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
549 def _update(self, final=False):
551 Update cached manifest text and report progress.
553 if self._upload_started:
554 with self._collection_lock:
555 self.bytes_written = self._collection_size(self._local_collection)
558 manifest = self._local_collection.manifest_text()
560 # Get the manifest text without comitting pending blocks
561 manifest = self._local_collection.manifest_text(strip=False,
565 with self._state_lock:
566 self._state['manifest'] = manifest
570 except Exception as e:
571 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
573 self.bytes_written = self.bytes_skipped
574 # Call the reporter, if any
575 self.report_progress()
577 def report_progress(self):
578 if self.reporter is not None:
579 self.reporter(self.bytes_written, self.bytes_expected)
581 def _write_stdin(self, filename):
582 output = self._local_collection.open(filename, 'wb')
583 self._write(sys.stdin, output)
586 def _check_file(self, source, filename):
588 Check if this file needs to be uploaded
590 # Ignore symlinks when requested
591 if (not self.follow_links) and os.path.islink(source):
594 should_upload = False
595 new_file_in_cache = False
596 # Record file path for updating the remote collection before exiting
597 self._file_paths.add(filename)
599 with self._state_lock:
600 # If no previous cached data on this file, store it for an eventual
602 if source not in self._state['files']:
603 self._state['files'][source] = {
604 'mtime': os.path.getmtime(source),
605 'size' : os.path.getsize(source)
607 new_file_in_cache = True
608 cached_file_data = self._state['files'][source]
610 # Check if file was already uploaded (at least partially)
611 file_in_local_collection = self._local_collection.find(filename)
613 # If not resuming, upload the full file.
616 # New file detected from last run, upload it.
617 elif new_file_in_cache:
619 # Local file didn't change from last run.
620 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
621 if not file_in_local_collection:
622 # File not uploaded yet, upload it completely
624 elif file_in_local_collection.permission_expired():
625 # Permission token expired, re-upload file. This will change whenever
626 # we have a API for refreshing tokens.
628 self._local_collection.remove(filename)
629 elif cached_file_data['size'] == file_in_local_collection.size():
630 # File already there, skip it.
631 self.bytes_skipped += cached_file_data['size']
632 elif cached_file_data['size'] > file_in_local_collection.size():
633 # File partially uploaded, resume!
634 resume_offset = file_in_local_collection.size()
635 self.bytes_skipped += resume_offset
638 # Inconsistent cache, re-upload the file
640 self._local_collection.remove(filename)
641 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
642 # Local file differs from cached data, re-upload it.
644 if file_in_local_collection:
645 self._local_collection.remove(filename)
649 self._files_to_upload.append((source, resume_offset, filename))
651 def _upload_files(self):
652 for source, resume_offset, filename in self._files_to_upload:
653 with open(source, 'rb') as source_fd:
654 with self._state_lock:
655 self._state['files'][source]['mtime'] = os.path.getmtime(source)
656 self._state['files'][source]['size'] = os.path.getsize(source)
657 if resume_offset > 0:
658 # Start upload where we left off
659 output = self._local_collection.open(filename, 'ab')
660 source_fd.seek(resume_offset)
663 output = self._local_collection.open(filename, 'wb')
664 self._write(source_fd, output)
665 output.close(flush=False)
667 def _write(self, source_fd, output):
669 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
674 def _my_collection(self):
675 return self._remote_collection if self.update else self._local_collection
677 def _setup_state(self, update_collection):
679 Create a new cache file or load a previously existing one.
681 # Load an already existing collection for update
682 if update_collection and re.match(arvados.util.collection_uuid_pattern,
685 self._remote_collection = arvados.collection.Collection(update_collection)
686 except arvados.errors.ApiError as error:
687 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
690 elif update_collection:
691 # Collection locator provided, but unknown format
692 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
695 # Set up cache file name from input paths.
697 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
698 realpaths = sorted(os.path.realpath(path) for path in self.paths)
699 md5.update(b'\0'.join([p.encode() for p in realpaths]))
701 md5.update(self.filename.encode())
702 cache_filename = md5.hexdigest()
703 cache_filepath = os.path.join(
704 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
706 if self.resume and os.path.exists(cache_filepath):
707 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
708 self._cache_file = open(cache_filepath, 'a+')
710 # --no-resume means start with a empty cache file.
711 self.logger.info("Creating new cache file at {}".format(cache_filepath))
712 self._cache_file = open(cache_filepath, 'w+')
713 self._cache_filename = self._cache_file.name
714 self._lock_file(self._cache_file)
715 self._cache_file.seek(0)
717 with self._state_lock:
720 self._state = json.load(self._cache_file)
721 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
722 # Cache at least partially incomplete, set up new cache
723 self._state = copy.deepcopy(self.EMPTY_STATE)
725 # Cache file empty, set up new cache
726 self._state = copy.deepcopy(self.EMPTY_STATE)
728 self.logger.info("No cache usage requested for this run.")
729 # No cache file, set empty state
730 self._state = copy.deepcopy(self.EMPTY_STATE)
731 # Load the previous manifest so we can check if files were modified remotely.
732 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
734 def collection_file_paths(self, col, path_prefix='.'):
735 """Return a list of file paths by recursively go through the entire collection `col`"""
737 for name, item in listitems(col):
738 if isinstance(item, arvados.arvfile.ArvadosFile):
739 file_paths.append(os.path.join(path_prefix, name))
740 elif isinstance(item, arvados.collection.Subcollection):
741 new_prefix = os.path.join(path_prefix, name)
742 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
745 def _lock_file(self, fileobj):
747 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
749 raise ResumeCacheConflict("{} locked".format(fileobj.name))
751 def _save_state(self):
753 Atomically save current state into cache.
755 with self._state_lock:
756 # We're not using copy.deepcopy() here because it's a lot slower
757 # than json.dumps(), and we're already needing JSON format to be
759 state = json.dumps(self._state)
761 new_cache = tempfile.NamedTemporaryFile(
763 dir=os.path.dirname(self._cache_filename), delete=False)
764 self._lock_file(new_cache)
765 new_cache.write(state)
768 os.rename(new_cache.name, self._cache_filename)
769 except (IOError, OSError, ResumeCacheConflict) as error:
770 self.logger.error("There was a problem while saving the cache file: {}".format(error))
772 os.unlink(new_cache_name)
773 except NameError: # mkstemp failed.
776 self._cache_file.close()
777 self._cache_file = new_cache
779 def collection_name(self):
780 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
782 def manifest_locator(self):
783 return self._my_collection().manifest_locator()
785 def portable_data_hash(self):
786 pdh = self._my_collection().portable_data_hash()
787 m = self._my_collection().stripped_manifest().encode()
788 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
790 logger.warning("\n".join([
791 "arv-put: API server provided PDH differs from local manifest.",
792 " This should not happen; showing API server version."]))
795 def manifest_text(self, stream_name=".", strip=False, normalize=False):
796 return self._my_collection().manifest_text(stream_name, strip, normalize)
798 def _datablocks_on_item(self, item):
800 Return a list of datablock locators, recursively navigating
801 through subcollections
803 if isinstance(item, arvados.arvfile.ArvadosFile):
806 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
809 for segment in item.segments():
810 loc = segment.locator
813 elif isinstance(item, arvados.collection.Collection):
814 l = [self._datablocks_on_item(x) for x in listvalues(item)]
815 # Fast list flattener method taken from:
816 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
817 return [loc for sublist in l for loc in sublist]
821 def data_locators(self):
822 with self._collection_lock:
823 # Make sure all datablocks are flushed before getting the locators
824 self._my_collection().manifest_text()
825 datablocks = self._datablocks_on_item(self._my_collection())
829 def expected_bytes_for(pathlist, follow_links=True):
830 # Walk the given directory trees and stat files, adding up file sizes,
831 # so we can display progress as percent
833 for path in pathlist:
834 if os.path.isdir(path):
835 for root, dirs, files in os.walk(path, followlinks=follow_links):
838 filepath = os.path.join(root, f)
839 # Ignore symlinked files when requested
840 if (not follow_links) and os.path.islink(filepath):
842 bytesum += os.path.getsize(filepath)
843 elif not os.path.isfile(path):
846 bytesum += os.path.getsize(path)
849 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
851 def machine_progress(bytes_written, bytes_expected):
852 return _machine_format.format(
853 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
855 def human_progress(bytes_written, bytes_expected):
857 return "\r{}M / {}M {:.1%} ".format(
858 bytes_written >> 20, bytes_expected >> 20,
859 float(bytes_written) / bytes_expected)
861 return "\r{} ".format(bytes_written)
863 def progress_writer(progress_func, outfile=sys.stderr):
864 def write_progress(bytes_written, bytes_expected):
865 outfile.write(progress_func(bytes_written, bytes_expected))
866 return write_progress
868 def exit_signal_handler(sigcode, frame):
871 def desired_project_uuid(api_client, project_uuid, num_retries):
873 query = api_client.users().current()
874 elif arvados.util.user_uuid_pattern.match(project_uuid):
875 query = api_client.users().get(uuid=project_uuid)
876 elif arvados.util.group_uuid_pattern.match(project_uuid):
877 query = api_client.groups().get(uuid=project_uuid)
879 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
880 return query.execute(num_retries=num_retries)['uuid']
882 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
885 logger = logging.getLogger('arvados.arv_put')
886 logger.setLevel(logging.INFO)
887 args = parse_arguments(arguments)
889 if api_client is None:
890 api_client = arvados.api('v1')
892 # Determine the name to use
894 if args.stream or args.raw:
895 logger.error("Cannot use --name with --stream or --raw")
897 elif args.update_collection:
898 logger.error("Cannot use --name with --update-collection")
900 collection_name = args.name
902 collection_name = "Saved at {} by {}@{}".format(
903 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
904 pwd.getpwuid(os.getuid()).pw_name,
905 socket.gethostname())
907 if args.project_uuid and (args.stream or args.raw):
908 logger.error("Cannot use --project-uuid with --stream or --raw")
911 # Determine the parent project
913 project_uuid = desired_project_uuid(api_client, args.project_uuid,
915 except (apiclient_errors.Error, ValueError) as error:
920 reporter = progress_writer(human_progress)
921 elif args.batch_progress:
922 reporter = progress_writer(machine_progress)
926 # If this is used by a human, and there's at least one directory to be
927 # uploaded, the expected bytes calculation can take a moment.
928 if args.progress and any([os.path.isdir(f) for f in args.paths]):
929 logger.info("Calculating upload size, this could take some time...")
930 bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
933 writer = ArvPutUploadJob(paths = args.paths,
934 resume = args.resume,
935 use_cache = args.use_cache,
936 filename = args.filename,
938 bytes_expected = bytes_expected,
939 num_retries = args.retries,
940 replication_desired = args.replication,
941 put_threads = args.threads,
942 name = collection_name,
943 owner_uuid = project_uuid,
944 ensure_unique_name = True,
945 update_collection = args.update_collection,
947 dry_run=args.dry_run,
948 follow_links=args.follow_links)
949 except ResumeCacheConflict:
950 logger.error("\n".join([
951 "arv-put: Another process is already uploading this data.",
952 " Use --no-cache if this is really what you want."]))
954 except CollectionUpdateError as error:
955 logger.error("\n".join([
956 "arv-put: %s" % str(error)]))
958 except ArvPutUploadIsPending:
959 # Dry run check successful, return proper exit code.
961 except ArvPutUploadNotPending:
962 # No files pending for upload
965 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
967 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
968 for sigcode in CAUGHT_SIGNALS}
970 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
971 logger.warning("\n".join([
972 "arv-put: Resuming previous upload from last checkpoint.",
973 " Use the --no-resume option to start over."]))
976 writer.report_progress()
979 writer.start(save_collection=not(args.stream or args.raw))
980 except arvados.errors.ApiError as error:
981 logger.error("\n".join([
982 "arv-put: %s" % str(error)]))
984 except ArvPutUploadIsPending:
985 # Dry run check successful, return proper exit code.
987 except ArvPutUploadNotPending:
988 # No files pending for upload
990 except PathDoesNotExistError as error:
991 logger.error("\n".join([
992 "arv-put: %s" % str(error)]))
995 if args.progress: # Print newline to split stderr from stdout for humans.
1000 output = writer.manifest_text(normalize=True)
1002 output = writer.manifest_text()
1004 output = ','.join(writer.data_locators())
1007 if args.update_collection:
1008 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1010 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1011 if args.portable_data_hash:
1012 output = writer.portable_data_hash()
1014 output = writer.manifest_locator()
1015 except apiclient_errors.Error as error:
1017 "arv-put: Error creating Collection on project: {}.".format(
1021 # Print the locator (uuid) of the new collection.
1023 status = status or 1
1025 stdout.write(output)
1026 if not output.endswith('\n'):
1029 for sigcode, orig_handler in listitems(orig_signal_handlers):
1030 signal.signal(sigcode, orig_handler)
1039 if __name__ == '__main__':