4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. Default: read from standard input.
46 _group = upload_opts.add_mutually_exclusive_group()
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49 default=-1, help=argparse.SUPPRESS)
51 _group.add_argument('--normalize', action='store_true',
53 Normalize the manifest by re-ordering files and streams after writing
57 _group.add_argument('--dry-run', action='store_true', default=False,
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
63 _group = upload_opts.add_mutually_exclusive_group()
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
70 _group.add_argument('--stream', action='store_true',
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
79 Synonym for --manifest.
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--manifest', action='store_true',
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
99 _group.add_argument('--raw', action='store_true',
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107 dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113 dest='filename', help="""
114 Synonym for --filename.
117 upload_opts.add_argument('--filename', type=str, default=None,
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
143 On high latency installations, using a greater number will improve
147 run_opts = argparse.ArgumentParser(add_help=False)
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
166 _group.add_argument('--no-progress', action='store_true',
168 Do not display human-readable progress on stderr, even if stderr is a
172 _group.add_argument('--batch-progress', action='store_true',
174 Display machine-readable progress on stderr (bytes and, if known,
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
181 Continue interrupted uploads from cached state (default).
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
185 Do not continue interrupted uploads from cached state.
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--follow-links', action='store_true', default=True,
190 dest='follow_links', help="""
191 Follow file and directory symlinks (default).
193 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
195 Do not follow file and directory symlinks.
198 _group = run_opts.add_mutually_exclusive_group()
199 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
201 Save upload state in a cache file for resuming (default).
203 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
205 Do not save upload state in a cache file for resuming.
208 arg_parser = argparse.ArgumentParser(
209 description='Copy data from the local filesystem to Keep.',
210 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
212 def parse_arguments(arguments):
213 args = arg_parser.parse_args(arguments)
215 if len(args.paths) == 0:
218 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
220 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
223 --filename argument cannot be used when storing a directory or
227 # Turn on --progress by default if stderr is a tty.
228 if (not (args.batch_progress or args.no_progress)
229 and os.isatty(sys.stderr.fileno())):
232 # Turn off --resume (default) if --no-cache is used.
233 if not args.use_cache:
236 if args.paths == ['-']:
237 if args.update_collection:
239 --update-collection cannot be used when reading from stdin.
242 args.use_cache = False
243 if not args.filename:
244 args.filename = 'stdin'
249 class PathDoesNotExistError(Exception):
253 class CollectionUpdateError(Exception):
257 class ResumeCacheConflict(Exception):
261 class ArvPutArgumentConflict(Exception):
265 class ArvPutUploadIsPending(Exception):
269 class ArvPutUploadNotPending(Exception):
273 class FileUploadList(list):
274 def __init__(self, dry_run=False):
276 self.dry_run = dry_run
278 def append(self, other):
280 raise ArvPutUploadIsPending()
281 super(FileUploadList, self).append(other)
284 class ResumeCache(object):
285 CACHE_DIR = '.cache/arvados/arv-put'
287 def __init__(self, file_spec):
288 self.cache_file = open(file_spec, 'a+')
289 self._lock_file(self.cache_file)
290 self.filename = self.cache_file.name
293 def make_path(cls, args):
295 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
296 realpaths = sorted(os.path.realpath(path) for path in args.paths)
297 md5.update('\0'.join(realpaths))
298 if any(os.path.isdir(path) for path in realpaths):
301 md5.update(args.filename)
303 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
306 def _lock_file(self, fileobj):
308 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
310 raise ResumeCacheConflict("{} locked".format(fileobj.name))
313 self.cache_file.seek(0)
314 return json.load(self.cache_file)
316 def check_cache(self, api_client=None, num_retries=0):
321 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
322 locator = state["_finished_streams"][0][1][0]
323 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
324 locator = state["_current_stream_locators"][0]
325 if locator is not None:
326 kc = arvados.keep.KeepClient(api_client=api_client)
327 kc.head(locator, num_retries=num_retries)
328 except Exception as e:
333 def save(self, data):
335 new_cache_fd, new_cache_name = tempfile.mkstemp(
336 dir=os.path.dirname(self.filename))
337 self._lock_file(new_cache_fd)
338 new_cache = os.fdopen(new_cache_fd, 'r+')
339 json.dump(data, new_cache)
340 os.rename(new_cache_name, self.filename)
341 except (IOError, OSError, ResumeCacheConflict) as error:
343 os.unlink(new_cache_name)
344 except NameError: # mkstemp failed.
347 self.cache_file.close()
348 self.cache_file = new_cache
351 self.cache_file.close()
355 os.unlink(self.filename)
356 except OSError as error:
357 if error.errno != errno.ENOENT: # That's what we wanted anyway.
363 self.__init__(self.filename)
366 class ArvPutUploadJob(object):
367 CACHE_DIR = '.cache/arvados/arv-put'
369 'manifest' : None, # Last saved manifest checkpoint
370 'files' : {} # Previous run file list: {path : {size, mtime}}
373 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
374 bytes_expected=None, name=None, owner_uuid=None,
375 ensure_unique_name=False, num_retries=None,
376 put_threads=None, replication_desired=None,
377 filename=None, update_time=60.0, update_collection=None,
378 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
382 self.use_cache = use_cache
384 self.reporter = reporter
385 self.bytes_expected = bytes_expected
386 self.bytes_written = 0
387 self.bytes_skipped = 0
389 self.owner_uuid = owner_uuid
390 self.ensure_unique_name = ensure_unique_name
391 self.num_retries = num_retries
392 self.replication_desired = replication_desired
393 self.put_threads = put_threads
394 self.filename = filename
395 self._state_lock = threading.Lock()
396 self._state = None # Previous run state (file list & manifest)
397 self._current_files = [] # Current run file list
398 self._cache_file = None
399 self._collection_lock = threading.Lock()
400 self._remote_collection = None # Collection being updated (if asked)
401 self._local_collection = None # Collection from previous run manifest
402 self._file_paths = set() # Files to be updated in remote collection
403 self._stop_checkpointer = threading.Event()
404 self._checkpointer = threading.Thread(target=self._update_task)
405 self._checkpointer.daemon = True
406 self._update_task_time = update_time # How many seconds wait between update runs
407 self._files_to_upload = FileUploadList(dry_run=dry_run)
408 self._upload_started = False
410 self.dry_run = dry_run
411 self._checkpoint_before_quit = True
412 self.follow_links = follow_links
414 if not self.use_cache and self.resume:
415 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
417 # Check for obvious dry-run responses
418 if self.dry_run and (not self.use_cache or not self.resume):
419 raise ArvPutUploadIsPending()
421 # Load cached data if any and if needed
422 self._setup_state(update_collection)
424 def start(self, save_collection):
426 Start supporting thread & file uploading
429 self._checkpointer.start()
431 for path in self.paths:
432 # Test for stdin first, in case some file named '-' exist
435 raise ArvPutUploadIsPending()
436 self._write_stdin(self.filename or 'stdin')
437 elif not os.path.exists(path):
438 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
439 elif os.path.isdir(path):
440 # Use absolute paths on cache index so CWD doesn't interfere
441 # with the caching logic.
442 prefixdir = path = os.path.abspath(path)
445 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
446 # Make os.walk()'s dir traversing order deterministic
450 self._check_file(os.path.join(root, f),
451 os.path.join(root[len(prefixdir):], f))
453 self._check_file(os.path.abspath(path),
454 self.filename or os.path.basename(path))
455 # If dry-mode is on, and got up to this point, then we should notify that
456 # there aren't any file to upload.
458 raise ArvPutUploadNotPending()
459 # Remove local_collection's files that don't exist locally anymore, so the
460 # bytes_written count is correct.
461 for f in self.collection_file_paths(self._local_collection,
463 if f != 'stdin' and f != self.filename and not f in self._file_paths:
464 self._local_collection.remove(f)
465 # Update bytes_written from current local collection and
466 # report initial progress.
469 self._upload_started = True # Used by the update thread to start checkpointing
471 except (SystemExit, Exception) as e:
472 self._checkpoint_before_quit = False
473 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
474 # Note: We're expecting SystemExit instead of KeyboardInterrupt because
475 # we have a custom signal handler in place that raises SystemExit with
476 # the catched signal's code.
477 if isinstance(e, PathDoesNotExistError):
478 # We aren't interested in the traceback for this case
480 elif not isinstance(e, SystemExit) or e.code != -2:
481 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
485 # Stop the thread before doing anything else
486 self._stop_checkpointer.set()
487 self._checkpointer.join()
488 if self._checkpoint_before_quit:
489 # Commit all pending blocks & one last _update()
490 self._local_collection.manifest_text()
491 self._update(final=True)
493 self.save_collection()
495 self._cache_file.close()
497 def save_collection(self):
499 # Check if files should be updated on the remote collection.
500 for fp in self._file_paths:
501 remote_file = self._remote_collection.find(fp)
503 # File don't exist on remote collection, copy it.
504 self._remote_collection.copy(fp, fp, self._local_collection)
505 elif remote_file != self._local_collection.find(fp):
506 # A different file exist on remote collection, overwrite it.
507 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
509 # The file already exist on remote collection, skip it.
511 self._remote_collection.save(num_retries=self.num_retries)
513 self._local_collection.save_new(
514 name=self.name, owner_uuid=self.owner_uuid,
515 ensure_unique_name=self.ensure_unique_name,
516 num_retries=self.num_retries)
518 def destroy_cache(self):
521 os.unlink(self._cache_filename)
522 except OSError as error:
523 # That's what we wanted anyway.
524 if error.errno != errno.ENOENT:
526 self._cache_file.close()
528 def _collection_size(self, collection):
530 Recursively get the total size of the collection
533 for item in collection.values():
534 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
535 size += self._collection_size(item)
540 def _update_task(self):
542 Periodically called support task. File uploading is
543 asynchronous so we poll status from the collection.
545 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
548 def _update(self, final=False):
550 Update cached manifest text and report progress.
552 if self._upload_started:
553 with self._collection_lock:
554 self.bytes_written = self._collection_size(self._local_collection)
557 manifest = self._local_collection.manifest_text()
559 # Get the manifest text without comitting pending blocks
560 manifest = self._local_collection.manifest_text(strip=False,
564 with self._state_lock:
565 self._state['manifest'] = manifest
569 except Exception as e:
570 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
572 self.bytes_written = self.bytes_skipped
573 # Call the reporter, if any
574 self.report_progress()
576 def report_progress(self):
577 if self.reporter is not None:
578 self.reporter(self.bytes_written, self.bytes_expected)
580 def _write_stdin(self, filename):
581 output = self._local_collection.open(filename, 'w')
582 self._write(sys.stdin, output)
585 def _check_file(self, source, filename):
587 Check if this file needs to be uploaded
589 # Ignore symlinks when requested
590 if (not self.follow_links) and os.path.islink(source):
593 should_upload = False
594 new_file_in_cache = False
595 # Record file path for updating the remote collection before exiting
596 self._file_paths.add(filename)
598 with self._state_lock:
599 # If no previous cached data on this file, store it for an eventual
601 if source not in self._state['files']:
602 self._state['files'][source] = {
603 'mtime': os.path.getmtime(source),
604 'size' : os.path.getsize(source)
606 new_file_in_cache = True
607 cached_file_data = self._state['files'][source]
609 # Check if file was already uploaded (at least partially)
610 file_in_local_collection = self._local_collection.find(filename)
612 # If not resuming, upload the full file.
615 # New file detected from last run, upload it.
616 elif new_file_in_cache:
618 # Local file didn't change from last run.
619 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
620 if not file_in_local_collection:
621 # File not uploaded yet, upload it completely
623 elif file_in_local_collection.permission_expired():
624 # Permission token expired, re-upload file. This will change whenever
625 # we have a API for refreshing tokens.
627 self._local_collection.remove(filename)
628 elif cached_file_data['size'] == file_in_local_collection.size():
629 # File already there, skip it.
630 self.bytes_skipped += cached_file_data['size']
631 elif cached_file_data['size'] > file_in_local_collection.size():
632 # File partially uploaded, resume!
633 resume_offset = file_in_local_collection.size()
634 self.bytes_skipped += resume_offset
637 # Inconsistent cache, re-upload the file
639 self._local_collection.remove(filename)
640 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
641 # Local file differs from cached data, re-upload it.
643 if file_in_local_collection:
644 self._local_collection.remove(filename)
648 self._files_to_upload.append((source, resume_offset, filename))
650 def _upload_files(self):
651 for source, resume_offset, filename in self._files_to_upload:
652 with open(source, 'r') as source_fd:
653 with self._state_lock:
654 self._state['files'][source]['mtime'] = os.path.getmtime(source)
655 self._state['files'][source]['size'] = os.path.getsize(source)
656 if resume_offset > 0:
657 # Start upload where we left off
658 output = self._local_collection.open(filename, 'a')
659 source_fd.seek(resume_offset)
662 output = self._local_collection.open(filename, 'w')
663 self._write(source_fd, output)
664 output.close(flush=False)
666 def _write(self, source_fd, output):
668 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
673 def _my_collection(self):
674 return self._remote_collection if self.update else self._local_collection
676 def _setup_state(self, update_collection):
678 Create a new cache file or load a previously existing one.
680 # Load an already existing collection for update
681 if update_collection and re.match(arvados.util.collection_uuid_pattern,
684 self._remote_collection = arvados.collection.Collection(update_collection)
685 except arvados.errors.ApiError as error:
686 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
689 elif update_collection:
690 # Collection locator provided, but unknown format
691 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
694 # Set up cache file name from input paths.
696 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
697 realpaths = sorted(os.path.realpath(path) for path in self.paths)
698 md5.update('\0'.join(realpaths))
700 md5.update(self.filename)
701 cache_filename = md5.hexdigest()
702 cache_filepath = os.path.join(
703 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
705 if self.resume and os.path.exists(cache_filepath):
706 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
707 self._cache_file = open(cache_filepath, 'a+')
709 # --no-resume means start with a empty cache file.
710 self.logger.info("Creating new cache file at {}".format(cache_filepath))
711 self._cache_file = open(cache_filepath, 'w+')
712 self._cache_filename = self._cache_file.name
713 self._lock_file(self._cache_file)
714 self._cache_file.seek(0)
716 with self._state_lock:
719 self._state = json.load(self._cache_file)
720 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
721 # Cache at least partially incomplete, set up new cache
722 self._state = copy.deepcopy(self.EMPTY_STATE)
724 # Cache file empty, set up new cache
725 self._state = copy.deepcopy(self.EMPTY_STATE)
727 self.logger.info("No cache usage requested for this run.")
728 # No cache file, set empty state
729 self._state = copy.deepcopy(self.EMPTY_STATE)
730 # Load the previous manifest so we can check if files were modified remotely.
731 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
733 def collection_file_paths(self, col, path_prefix='.'):
734 """Return a list of file paths by recursively go through the entire collection `col`"""
736 for name, item in col.items():
737 if isinstance(item, arvados.arvfile.ArvadosFile):
738 file_paths.append(os.path.join(path_prefix, name))
739 elif isinstance(item, arvados.collection.Subcollection):
740 new_prefix = os.path.join(path_prefix, name)
741 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
744 def _lock_file(self, fileobj):
746 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
748 raise ResumeCacheConflict("{} locked".format(fileobj.name))
750 def _save_state(self):
752 Atomically save current state into cache.
754 with self._state_lock:
755 # We're not using copy.deepcopy() here because it's a lot slower
756 # than json.dumps(), and we're already needing JSON format to be
758 state = json.dumps(self._state)
760 new_cache = tempfile.NamedTemporaryFile(
761 dir=os.path.dirname(self._cache_filename), delete=False)
762 self._lock_file(new_cache)
763 new_cache.write(state)
766 os.rename(new_cache.name, self._cache_filename)
767 except (IOError, OSError, ResumeCacheConflict) as error:
768 self.logger.error("There was a problem while saving the cache file: {}".format(error))
770 os.unlink(new_cache_name)
771 except NameError: # mkstemp failed.
774 self._cache_file.close()
775 self._cache_file = new_cache
777 def collection_name(self):
778 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
780 def manifest_locator(self):
781 return self._my_collection().manifest_locator()
783 def portable_data_hash(self):
784 pdh = self._my_collection().portable_data_hash()
785 m = self._my_collection().stripped_manifest()
786 local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
788 logger.warning("\n".join([
789 "arv-put: API server provided PDH differs from local manifest.",
790 " This should not happen; showing API server version."]))
793 def manifest_text(self, stream_name=".", strip=False, normalize=False):
794 return self._my_collection().manifest_text(stream_name, strip, normalize)
796 def _datablocks_on_item(self, item):
798 Return a list of datablock locators, recursively navigating
799 through subcollections
801 if isinstance(item, arvados.arvfile.ArvadosFile):
804 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
807 for segment in item.segments():
808 loc = segment.locator
811 elif isinstance(item, arvados.collection.Collection):
812 l = [self._datablocks_on_item(x) for x in item.values()]
813 # Fast list flattener method taken from:
814 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
815 return [loc for sublist in l for loc in sublist]
819 def data_locators(self):
820 with self._collection_lock:
821 # Make sure all datablocks are flushed before getting the locators
822 self._my_collection().manifest_text()
823 datablocks = self._datablocks_on_item(self._my_collection())
827 def expected_bytes_for(pathlist, follow_links=True):
828 # Walk the given directory trees and stat files, adding up file sizes,
829 # so we can display progress as percent
831 for path in pathlist:
832 if os.path.isdir(path):
833 for root, dirs, files in os.walk(path, followlinks=follow_links):
836 filepath = os.path.join(root, f)
837 # Ignore symlinked files when requested
838 if (not follow_links) and os.path.islink(filepath):
840 bytesum += os.path.getsize(filepath)
841 elif not os.path.isfile(path):
844 bytesum += os.path.getsize(path)
847 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
849 def machine_progress(bytes_written, bytes_expected):
850 return _machine_format.format(
851 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
853 def human_progress(bytes_written, bytes_expected):
855 return "\r{}M / {}M {:.1%} ".format(
856 bytes_written >> 20, bytes_expected >> 20,
857 float(bytes_written) / bytes_expected)
859 return "\r{} ".format(bytes_written)
861 def progress_writer(progress_func, outfile=sys.stderr):
862 def write_progress(bytes_written, bytes_expected):
863 outfile.write(progress_func(bytes_written, bytes_expected))
864 return write_progress
866 def exit_signal_handler(sigcode, frame):
869 def desired_project_uuid(api_client, project_uuid, num_retries):
871 query = api_client.users().current()
872 elif arvados.util.user_uuid_pattern.match(project_uuid):
873 query = api_client.users().get(uuid=project_uuid)
874 elif arvados.util.group_uuid_pattern.match(project_uuid):
875 query = api_client.groups().get(uuid=project_uuid)
877 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
878 return query.execute(num_retries=num_retries)['uuid']
880 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
883 logger = logging.getLogger('arvados.arv_put')
884 logger.setLevel(logging.INFO)
885 args = parse_arguments(arguments)
887 if api_client is None:
888 api_client = arvados.api('v1')
890 # Determine the name to use
892 if args.stream or args.raw:
893 logger.error("Cannot use --name with --stream or --raw")
895 elif args.update_collection:
896 logger.error("Cannot use --name with --update-collection")
898 collection_name = args.name
900 collection_name = "Saved at {} by {}@{}".format(
901 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
902 pwd.getpwuid(os.getuid()).pw_name,
903 socket.gethostname())
905 if args.project_uuid and (args.stream or args.raw):
906 logger.error("Cannot use --project-uuid with --stream or --raw")
909 # Determine the parent project
911 project_uuid = desired_project_uuid(api_client, args.project_uuid,
913 except (apiclient_errors.Error, ValueError) as error:
918 reporter = progress_writer(human_progress)
919 elif args.batch_progress:
920 reporter = progress_writer(machine_progress)
924 # If this is used by a human, and there's at least one directory to be
925 # uploaded, the expected bytes calculation can take a moment.
926 if args.progress and any([os.path.isdir(f) for f in args.paths]):
927 logger.info("Calculating upload size, this could take some time...")
928 bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
931 writer = ArvPutUploadJob(paths = args.paths,
932 resume = args.resume,
933 use_cache = args.use_cache,
934 filename = args.filename,
936 bytes_expected = bytes_expected,
937 num_retries = args.retries,
938 replication_desired = args.replication,
939 put_threads = args.threads,
940 name = collection_name,
941 owner_uuid = project_uuid,
942 ensure_unique_name = True,
943 update_collection = args.update_collection,
945 dry_run=args.dry_run,
946 follow_links=args.follow_links)
947 except ResumeCacheConflict:
948 logger.error("\n".join([
949 "arv-put: Another process is already uploading this data.",
950 " Use --no-cache if this is really what you want."]))
952 except CollectionUpdateError as error:
953 logger.error("\n".join([
954 "arv-put: %s" % str(error)]))
956 except ArvPutUploadIsPending:
957 # Dry run check successful, return proper exit code.
959 except ArvPutUploadNotPending:
960 # No files pending for upload
963 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
965 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
966 for sigcode in CAUGHT_SIGNALS}
968 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
969 logger.warning("\n".join([
970 "arv-put: Resuming previous upload from last checkpoint.",
971 " Use the --no-resume option to start over."]))
974 writer.report_progress()
977 writer.start(save_collection=not(args.stream or args.raw))
978 except arvados.errors.ApiError as error:
979 logger.error("\n".join([
980 "arv-put: %s" % str(error)]))
982 except ArvPutUploadIsPending:
983 # Dry run check successful, return proper exit code.
985 except ArvPutUploadNotPending:
986 # No files pending for upload
988 except PathDoesNotExistError as error:
989 logger.error("\n".join([
990 "arv-put: %s" % str(error)]))
993 if args.progress: # Print newline to split stderr from stdout for humans.
998 output = writer.manifest_text(normalize=True)
1000 output = writer.manifest_text()
1002 output = ','.join(writer.data_locators())
1005 if args.update_collection:
1006 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1008 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1009 if args.portable_data_hash:
1010 output = writer.portable_data_hash()
1012 output = writer.manifest_locator()
1013 except apiclient_errors.Error as error:
1015 "arv-put: Error creating Collection on project: {}.".format(
1019 # Print the locator (uuid) of the new collection.
1021 status = status or 1
1023 stdout.write(output)
1024 if not output.endswith('\n'):
1027 for sigcode, orig_handler in orig_signal_handlers.items():
1028 signal.signal(sigcode, orig_handler)
1037 if __name__ == '__main__':