1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
30 import arvados.commands._util as arv_cmd
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
35 upload_opts = argparse.ArgumentParser(add_help=False)
37 upload_opts.add_argument('--version', action='version',
38 version="%s %s" % (sys.argv[0], __version__),
39 help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42 Local file or directory. If path is a directory reference with a trailing
43 slash, then just upload the directory's contents; otherwise upload the
44 directory itself. Default: read from standard input.
47 _group = upload_opts.add_mutually_exclusive_group()
49 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
50 default=-1, help=argparse.SUPPRESS)
52 _group.add_argument('--normalize', action='store_true',
54 Normalize the manifest by re-ordering files and streams after writing
58 _group.add_argument('--dry-run', action='store_true', default=False,
60 Don't actually upload files, but only check if any file should be
61 uploaded. Exit with code=2 when files are pending for upload.
64 _group = upload_opts.add_mutually_exclusive_group()
66 _group.add_argument('--as-stream', action='store_true', dest='stream',
71 _group.add_argument('--stream', action='store_true',
73 Store the file content and display the resulting manifest on
74 stdout. Do not write the manifest to Keep or save a Collection object
78 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80 Synonym for --manifest.
83 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--manifest', action='store_true',
90 Store the file data and resulting manifest in Keep, save a Collection
91 object in Arvados, and display the manifest locator (Collection uuid)
92 on stdout. This is the default behavior.
95 _group.add_argument('--as-raw', action='store_true', dest='raw',
100 _group.add_argument('--raw', action='store_true',
102 Store the file content and display the data block locators on stdout,
103 separated by commas, with a trailing newline. Do not store a
107 upload_opts.add_argument('--update-collection', type=str, default=None,
108 dest='update_collection', metavar="UUID", help="""
109 Update an existing collection identified by the given Arvados collection
110 UUID. All new local files will be uploaded.
113 upload_opts.add_argument('--use-filename', type=str, default=None,
114 dest='filename', help="""
115 Synonym for --filename.
118 upload_opts.add_argument('--filename', type=str, default=None,
120 Use the given filename in the manifest, instead of the name of the
121 local file. This is useful when "-" or "/dev/stdin" is given as an
122 input file. It can be used only if there is exactly one path given and
123 it is not a directory. Implies --manifest.
126 upload_opts.add_argument('--portable-data-hash', action='store_true',
128 Print the portable data hash instead of the Arvados UUID for the collection
129 created by the upload.
132 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134 Set the replication level for the new collection: how many different
135 physical storage devices (e.g., disks) should have a copy of each data
136 block. Default is to use the server-provided default (if any) or 2.
139 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
141 Set the number of upload threads to be used. Take into account that
142 using lots of threads will increase the RAM requirements. Default is
144 On high latency installations, using a greater number will improve
148 run_opts = argparse.ArgumentParser(add_help=False)
150 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
151 Store the collection in the specified project, instead of your Home
155 run_opts.add_argument('--name', help="""
156 Save the collection with the specified name.
159 _group = run_opts.add_mutually_exclusive_group()
160 _group.add_argument('--progress', action='store_true',
162 Display human-readable progress on stderr (bytes and, if possible,
163 percentage of total data size). This is the default behavior when
167 _group.add_argument('--no-progress', action='store_true',
169 Do not display human-readable progress on stderr, even if stderr is a
173 _group.add_argument('--batch-progress', action='store_true',
175 Display machine-readable progress on stderr (bytes and, if known,
179 _group = run_opts.add_mutually_exclusive_group()
180 _group.add_argument('--resume', action='store_true', default=True,
182 Continue interrupted uploads from cached state (default).
184 _group.add_argument('--no-resume', action='store_false', dest='resume',
186 Do not continue interrupted uploads from cached state.
189 _group = run_opts.add_mutually_exclusive_group()
190 _group.add_argument('--follow-links', action='store_true', default=True,
191 dest='follow_links', help="""
192 Follow file and directory symlinks (default).
194 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
196 Do not follow file and directory symlinks.
199 _group = run_opts.add_mutually_exclusive_group()
200 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
202 Save upload state in a cache file for resuming (default).
204 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
206 Do not save upload state in a cache file for resuming.
209 arg_parser = argparse.ArgumentParser(
210 description='Copy data from the local filesystem to Keep.',
211 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
213 def parse_arguments(arguments):
214 args = arg_parser.parse_args(arguments)
216 if len(args.paths) == 0:
219 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
221 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
224 --filename argument cannot be used when storing a directory or
228 # Turn on --progress by default if stderr is a tty.
229 if (not (args.batch_progress or args.no_progress)
230 and os.isatty(sys.stderr.fileno())):
233 # Turn off --resume (default) if --no-cache is used.
234 if not args.use_cache:
237 if args.paths == ['-']:
238 if args.update_collection:
240 --update-collection cannot be used when reading from stdin.
243 args.use_cache = False
244 if not args.filename:
245 args.filename = 'stdin'
250 class PathDoesNotExistError(Exception):
254 class CollectionUpdateError(Exception):
258 class ResumeCacheConflict(Exception):
262 class ArvPutArgumentConflict(Exception):
266 class ArvPutUploadIsPending(Exception):
270 class ArvPutUploadNotPending(Exception):
274 class FileUploadList(list):
275 def __init__(self, dry_run=False):
277 self.dry_run = dry_run
279 def append(self, other):
281 raise ArvPutUploadIsPending()
282 super(FileUploadList, self).append(other)
285 class ResumeCache(object):
286 CACHE_DIR = '.cache/arvados/arv-put'
288 def __init__(self, file_spec):
289 self.cache_file = open(file_spec, 'a+')
290 self._lock_file(self.cache_file)
291 self.filename = self.cache_file.name
294 def make_path(cls, args):
296 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
297 realpaths = sorted(os.path.realpath(path) for path in args.paths)
298 md5.update(b'\0'.join([p.encode() for p in realpaths]))
299 if any(os.path.isdir(path) for path in realpaths):
302 md5.update(args.filename.encode())
304 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
307 def _lock_file(self, fileobj):
309 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
311 raise ResumeCacheConflict("{} locked".format(fileobj.name))
314 self.cache_file.seek(0)
315 return json.load(self.cache_file)
317 def check_cache(self, api_client=None, num_retries=0):
322 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
323 locator = state["_finished_streams"][0][1][0]
324 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
325 locator = state["_current_stream_locators"][0]
326 if locator is not None:
327 kc = arvados.keep.KeepClient(api_client=api_client)
328 kc.head(locator, num_retries=num_retries)
329 except Exception as e:
334 def save(self, data):
336 new_cache_fd, new_cache_name = tempfile.mkstemp(
337 dir=os.path.dirname(self.filename))
338 self._lock_file(new_cache_fd)
339 new_cache = os.fdopen(new_cache_fd, 'r+')
340 json.dump(data, new_cache)
341 os.rename(new_cache_name, self.filename)
342 except (IOError, OSError, ResumeCacheConflict) as error:
344 os.unlink(new_cache_name)
345 except NameError: # mkstemp failed.
348 self.cache_file.close()
349 self.cache_file = new_cache
352 self.cache_file.close()
356 os.unlink(self.filename)
357 except OSError as error:
358 if error.errno != errno.ENOENT: # That's what we wanted anyway.
364 self.__init__(self.filename)
367 class ArvPutUploadJob(object):
368 CACHE_DIR = '.cache/arvados/arv-put'
370 'manifest' : None, # Last saved manifest checkpoint
371 'files' : {} # Previous run file list: {path : {size, mtime}}
374 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
375 bytes_expected=None, name=None, owner_uuid=None,
376 ensure_unique_name=False, num_retries=None,
377 put_threads=None, replication_desired=None,
378 filename=None, update_time=60.0, update_collection=None,
379 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
383 self.use_cache = use_cache
385 self.reporter = reporter
386 self.bytes_expected = bytes_expected
387 self.bytes_written = 0
388 self.bytes_skipped = 0
390 self.owner_uuid = owner_uuid
391 self.ensure_unique_name = ensure_unique_name
392 self.num_retries = num_retries
393 self.replication_desired = replication_desired
394 self.put_threads = put_threads
395 self.filename = filename
396 self._state_lock = threading.Lock()
397 self._state = None # Previous run state (file list & manifest)
398 self._current_files = [] # Current run file list
399 self._cache_file = None
400 self._collection_lock = threading.Lock()
401 self._remote_collection = None # Collection being updated (if asked)
402 self._local_collection = None # Collection from previous run manifest
403 self._file_paths = set() # Files to be updated in remote collection
404 self._stop_checkpointer = threading.Event()
405 self._checkpointer = threading.Thread(target=self._update_task)
406 self._checkpointer.daemon = True
407 self._update_task_time = update_time # How many seconds wait between update runs
408 self._files_to_upload = FileUploadList(dry_run=dry_run)
409 self._upload_started = False
411 self.dry_run = dry_run
412 self._checkpoint_before_quit = True
413 self.follow_links = follow_links
415 if not self.use_cache and self.resume:
416 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
418 # Check for obvious dry-run responses
419 if self.dry_run and (not self.use_cache or not self.resume):
420 raise ArvPutUploadIsPending()
422 # Load cached data if any and if needed
423 self._setup_state(update_collection)
425 def start(self, save_collection):
427 Start supporting thread & file uploading
430 self._checkpointer.start()
432 for path in self.paths:
433 # Test for stdin first, in case some file named '-' exist
436 raise ArvPutUploadIsPending()
437 self._write_stdin(self.filename or 'stdin')
438 elif not os.path.exists(path):
439 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
440 elif os.path.isdir(path):
441 # Use absolute paths on cache index so CWD doesn't interfere
442 # with the caching logic.
444 path = os.path.abspath(path)
445 if orig_path[-1:] == os.sep:
446 # When passing a directory reference with a trailing slash,
447 # its contents should be uploaded directly to the collection's root.
450 # When passing a directory reference with no trailing slash,
451 # upload the directory to the collection's root.
452 prefixdir = os.path.dirname(path)
454 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
455 # Make os.walk()'s dir traversing order deterministic
459 self._check_file(os.path.join(root, f),
460 os.path.join(root[len(prefixdir):], f))
462 self._check_file(os.path.abspath(path),
463 self.filename or os.path.basename(path))
464 # If dry-mode is on, and got up to this point, then we should notify that
465 # there aren't any file to upload.
467 raise ArvPutUploadNotPending()
468 # Remove local_collection's files that don't exist locally anymore, so the
469 # bytes_written count is correct.
470 for f in self.collection_file_paths(self._local_collection,
472 if f != 'stdin' and f != self.filename and not f in self._file_paths:
473 self._local_collection.remove(f)
474 # Update bytes_written from current local collection and
475 # report initial progress.
478 self._upload_started = True # Used by the update thread to start checkpointing
480 except (SystemExit, Exception) as e:
481 self._checkpoint_before_quit = False
482 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
483 # Note: We're expecting SystemExit instead of
484 # KeyboardInterrupt because we have a custom signal
485 # handler in place that raises SystemExit with the catched
487 if isinstance(e, PathDoesNotExistError):
488 # We aren't interested in the traceback for this case
490 elif not isinstance(e, SystemExit) or e.code != -2:
491 self.logger.warning("Abnormal termination:\n{}".format(
492 traceback.format_exc()))
496 # Stop the thread before doing anything else
497 self._stop_checkpointer.set()
498 self._checkpointer.join()
499 if self._checkpoint_before_quit:
500 # Commit all pending blocks & one last _update()
501 self._local_collection.manifest_text()
502 self._update(final=True)
504 self.save_collection()
506 self._cache_file.close()
508 def save_collection(self):
510 # Check if files should be updated on the remote collection.
511 for fp in self._file_paths:
512 remote_file = self._remote_collection.find(fp)
514 # File don't exist on remote collection, copy it.
515 self._remote_collection.copy(fp, fp, self._local_collection)
516 elif remote_file != self._local_collection.find(fp):
517 # A different file exist on remote collection, overwrite it.
518 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
520 # The file already exist on remote collection, skip it.
522 self._remote_collection.save(num_retries=self.num_retries)
524 self._local_collection.save_new(
525 name=self.name, owner_uuid=self.owner_uuid,
526 ensure_unique_name=self.ensure_unique_name,
527 num_retries=self.num_retries)
529 def destroy_cache(self):
532 os.unlink(self._cache_filename)
533 except OSError as error:
534 # That's what we wanted anyway.
535 if error.errno != errno.ENOENT:
537 self._cache_file.close()
539 def _collection_size(self, collection):
541 Recursively get the total size of the collection
544 for item in listvalues(collection):
545 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
546 size += self._collection_size(item)
551 def _update_task(self):
553 Periodically called support task. File uploading is
554 asynchronous so we poll status from the collection.
556 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
559 def _update(self, final=False):
561 Update cached manifest text and report progress.
563 if self._upload_started:
564 with self._collection_lock:
565 self.bytes_written = self._collection_size(self._local_collection)
568 manifest = self._local_collection.manifest_text()
570 # Get the manifest text without comitting pending blocks
571 manifest = self._local_collection.manifest_text(strip=False,
575 with self._state_lock:
576 self._state['manifest'] = manifest
580 except Exception as e:
581 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
583 self.bytes_written = self.bytes_skipped
584 # Call the reporter, if any
585 self.report_progress()
587 def report_progress(self):
588 if self.reporter is not None:
589 self.reporter(self.bytes_written, self.bytes_expected)
591 def _write_stdin(self, filename):
592 output = self._local_collection.open(filename, 'wb')
593 self._write(sys.stdin, output)
596 def _check_file(self, source, filename):
598 Check if this file needs to be uploaded
600 # Ignore symlinks when requested
601 if (not self.follow_links) and os.path.islink(source):
604 should_upload = False
605 new_file_in_cache = False
606 # Record file path for updating the remote collection before exiting
607 self._file_paths.add(filename)
609 with self._state_lock:
610 # If no previous cached data on this file, store it for an eventual
612 if source not in self._state['files']:
613 self._state['files'][source] = {
614 'mtime': os.path.getmtime(source),
615 'size' : os.path.getsize(source)
617 new_file_in_cache = True
618 cached_file_data = self._state['files'][source]
620 # Check if file was already uploaded (at least partially)
621 file_in_local_collection = self._local_collection.find(filename)
623 # If not resuming, upload the full file.
626 # New file detected from last run, upload it.
627 elif new_file_in_cache:
629 # Local file didn't change from last run.
630 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
631 if not file_in_local_collection:
632 # File not uploaded yet, upload it completely
634 elif file_in_local_collection.permission_expired():
635 # Permission token expired, re-upload file. This will change whenever
636 # we have a API for refreshing tokens.
638 self._local_collection.remove(filename)
639 elif cached_file_data['size'] == file_in_local_collection.size():
640 # File already there, skip it.
641 self.bytes_skipped += cached_file_data['size']
642 elif cached_file_data['size'] > file_in_local_collection.size():
643 # File partially uploaded, resume!
644 resume_offset = file_in_local_collection.size()
645 self.bytes_skipped += resume_offset
648 # Inconsistent cache, re-upload the file
650 self._local_collection.remove(filename)
651 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
652 # Local file differs from cached data, re-upload it.
654 if file_in_local_collection:
655 self._local_collection.remove(filename)
659 self._files_to_upload.append((source, resume_offset, filename))
661 def _upload_files(self):
662 for source, resume_offset, filename in self._files_to_upload:
663 with open(source, 'rb') as source_fd:
664 with self._state_lock:
665 self._state['files'][source]['mtime'] = os.path.getmtime(source)
666 self._state['files'][source]['size'] = os.path.getsize(source)
667 if resume_offset > 0:
668 # Start upload where we left off
669 output = self._local_collection.open(filename, 'ab')
670 source_fd.seek(resume_offset)
673 output = self._local_collection.open(filename, 'wb')
674 self._write(source_fd, output)
675 output.close(flush=False)
677 def _write(self, source_fd, output):
679 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
684 def _my_collection(self):
685 return self._remote_collection if self.update else self._local_collection
687 def _setup_state(self, update_collection):
689 Create a new cache file or load a previously existing one.
691 # Load an already existing collection for update
692 if update_collection and re.match(arvados.util.collection_uuid_pattern,
695 self._remote_collection = arvados.collection.Collection(update_collection)
696 except arvados.errors.ApiError as error:
697 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
700 elif update_collection:
701 # Collection locator provided, but unknown format
702 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
705 # Set up cache file name from input paths.
707 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
708 realpaths = sorted(os.path.realpath(path) for path in self.paths)
709 md5.update(b'\0'.join([p.encode() for p in realpaths]))
711 md5.update(self.filename.encode())
712 cache_filename = md5.hexdigest()
713 cache_filepath = os.path.join(
714 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
716 if self.resume and os.path.exists(cache_filepath):
717 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
718 self._cache_file = open(cache_filepath, 'a+')
720 # --no-resume means start with a empty cache file.
721 self.logger.info("Creating new cache file at {}".format(cache_filepath))
722 self._cache_file = open(cache_filepath, 'w+')
723 self._cache_filename = self._cache_file.name
724 self._lock_file(self._cache_file)
725 self._cache_file.seek(0)
727 with self._state_lock:
730 self._state = json.load(self._cache_file)
731 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
732 # Cache at least partially incomplete, set up new cache
733 self._state = copy.deepcopy(self.EMPTY_STATE)
735 # Cache file empty, set up new cache
736 self._state = copy.deepcopy(self.EMPTY_STATE)
738 self.logger.info("No cache usage requested for this run.")
739 # No cache file, set empty state
740 self._state = copy.deepcopy(self.EMPTY_STATE)
741 # Load the previous manifest so we can check if files were modified remotely.
742 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
744 def collection_file_paths(self, col, path_prefix='.'):
745 """Return a list of file paths by recursively go through the entire collection `col`"""
747 for name, item in listitems(col):
748 if isinstance(item, arvados.arvfile.ArvadosFile):
749 file_paths.append(os.path.join(path_prefix, name))
750 elif isinstance(item, arvados.collection.Subcollection):
751 new_prefix = os.path.join(path_prefix, name)
752 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
755 def _lock_file(self, fileobj):
757 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
759 raise ResumeCacheConflict("{} locked".format(fileobj.name))
761 def _save_state(self):
763 Atomically save current state into cache.
765 with self._state_lock:
766 # We're not using copy.deepcopy() here because it's a lot slower
767 # than json.dumps(), and we're already needing JSON format to be
769 state = json.dumps(self._state)
771 new_cache = tempfile.NamedTemporaryFile(
773 dir=os.path.dirname(self._cache_filename), delete=False)
774 self._lock_file(new_cache)
775 new_cache.write(state)
778 os.rename(new_cache.name, self._cache_filename)
779 except (IOError, OSError, ResumeCacheConflict) as error:
780 self.logger.error("There was a problem while saving the cache file: {}".format(error))
782 os.unlink(new_cache_name)
783 except NameError: # mkstemp failed.
786 self._cache_file.close()
787 self._cache_file = new_cache
789 def collection_name(self):
790 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
792 def manifest_locator(self):
793 return self._my_collection().manifest_locator()
795 def portable_data_hash(self):
796 pdh = self._my_collection().portable_data_hash()
797 m = self._my_collection().stripped_manifest().encode()
798 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
800 logger.warning("\n".join([
801 "arv-put: API server provided PDH differs from local manifest.",
802 " This should not happen; showing API server version."]))
805 def manifest_text(self, stream_name=".", strip=False, normalize=False):
806 return self._my_collection().manifest_text(stream_name, strip, normalize)
808 def _datablocks_on_item(self, item):
810 Return a list of datablock locators, recursively navigating
811 through subcollections
813 if isinstance(item, arvados.arvfile.ArvadosFile):
816 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
819 for segment in item.segments():
820 loc = segment.locator
823 elif isinstance(item, arvados.collection.Collection):
824 l = [self._datablocks_on_item(x) for x in listvalues(item)]
825 # Fast list flattener method taken from:
826 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
827 return [loc for sublist in l for loc in sublist]
831 def data_locators(self):
832 with self._collection_lock:
833 # Make sure all datablocks are flushed before getting the locators
834 self._my_collection().manifest_text()
835 datablocks = self._datablocks_on_item(self._my_collection())
839 def expected_bytes_for(pathlist, follow_links=True):
840 # Walk the given directory trees and stat files, adding up file sizes,
841 # so we can display progress as percent
843 for path in pathlist:
844 if os.path.isdir(path):
845 for root, dirs, files in os.walk(path, followlinks=follow_links):
848 filepath = os.path.join(root, f)
849 # Ignore symlinked files when requested
850 if (not follow_links) and os.path.islink(filepath):
852 bytesum += os.path.getsize(filepath)
853 elif not os.path.isfile(path):
856 bytesum += os.path.getsize(path)
859 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
861 def machine_progress(bytes_written, bytes_expected):
862 return _machine_format.format(
863 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
865 def human_progress(bytes_written, bytes_expected):
867 return "\r{}M / {}M {:.1%} ".format(
868 bytes_written >> 20, bytes_expected >> 20,
869 float(bytes_written) / bytes_expected)
871 return "\r{} ".format(bytes_written)
873 def progress_writer(progress_func, outfile=sys.stderr):
874 def write_progress(bytes_written, bytes_expected):
875 outfile.write(progress_func(bytes_written, bytes_expected))
876 return write_progress
878 def exit_signal_handler(sigcode, frame):
881 def desired_project_uuid(api_client, project_uuid, num_retries):
883 query = api_client.users().current()
884 elif arvados.util.user_uuid_pattern.match(project_uuid):
885 query = api_client.users().get(uuid=project_uuid)
886 elif arvados.util.group_uuid_pattern.match(project_uuid):
887 query = api_client.groups().get(uuid=project_uuid)
889 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
890 return query.execute(num_retries=num_retries)['uuid']
892 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
895 logger = logging.getLogger('arvados.arv_put')
896 logger.setLevel(logging.INFO)
897 args = parse_arguments(arguments)
899 if api_client is None:
900 api_client = arvados.api('v1')
902 # Determine the name to use
904 if args.stream or args.raw:
905 logger.error("Cannot use --name with --stream or --raw")
907 elif args.update_collection:
908 logger.error("Cannot use --name with --update-collection")
910 collection_name = args.name
912 collection_name = "Saved at {} by {}@{}".format(
913 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
914 pwd.getpwuid(os.getuid()).pw_name,
915 socket.gethostname())
917 if args.project_uuid and (args.stream or args.raw):
918 logger.error("Cannot use --project-uuid with --stream or --raw")
921 # Determine the parent project
923 project_uuid = desired_project_uuid(api_client, args.project_uuid,
925 except (apiclient_errors.Error, ValueError) as error:
930 reporter = progress_writer(human_progress)
931 elif args.batch_progress:
932 reporter = progress_writer(machine_progress)
936 # If this is used by a human, and there's at least one directory to be
937 # uploaded, the expected bytes calculation can take a moment.
938 if args.progress and any([os.path.isdir(f) for f in args.paths]):
939 logger.info("Calculating upload size, this could take some time...")
940 bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
943 writer = ArvPutUploadJob(paths = args.paths,
944 resume = args.resume,
945 use_cache = args.use_cache,
946 filename = args.filename,
948 bytes_expected = bytes_expected,
949 num_retries = args.retries,
950 replication_desired = args.replication,
951 put_threads = args.threads,
952 name = collection_name,
953 owner_uuid = project_uuid,
954 ensure_unique_name = True,
955 update_collection = args.update_collection,
957 dry_run=args.dry_run,
958 follow_links=args.follow_links)
959 except ResumeCacheConflict:
960 logger.error("\n".join([
961 "arv-put: Another process is already uploading this data.",
962 " Use --no-cache if this is really what you want."]))
964 except CollectionUpdateError as error:
965 logger.error("\n".join([
966 "arv-put: %s" % str(error)]))
968 except ArvPutUploadIsPending:
969 # Dry run check successful, return proper exit code.
971 except ArvPutUploadNotPending:
972 # No files pending for upload
975 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
977 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
978 for sigcode in CAUGHT_SIGNALS}
980 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
981 logger.warning("\n".join([
982 "arv-put: Resuming previous upload from last checkpoint.",
983 " Use the --no-resume option to start over."]))
986 writer.report_progress()
989 writer.start(save_collection=not(args.stream or args.raw))
990 except arvados.errors.ApiError as error:
991 logger.error("\n".join([
992 "arv-put: %s" % str(error)]))
994 except ArvPutUploadIsPending:
995 # Dry run check successful, return proper exit code.
997 except ArvPutUploadNotPending:
998 # No files pending for upload
1000 except PathDoesNotExistError as error:
1001 logger.error("\n".join([
1002 "arv-put: %s" % str(error)]))
1005 if args.progress: # Print newline to split stderr from stdout for humans.
1010 output = writer.manifest_text(normalize=True)
1012 output = writer.manifest_text()
1014 output = ','.join(writer.data_locators())
1017 if args.update_collection:
1018 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1020 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1021 if args.portable_data_hash:
1022 output = writer.portable_data_hash()
1024 output = writer.manifest_locator()
1025 except apiclient_errors.Error as error:
1027 "arv-put: Error creating Collection on project: {}.".format(
1031 # Print the locator (uuid) of the new collection.
1033 status = status or 1
1035 stdout.write(output)
1036 if not output.endswith('\n'):
1039 for sigcode, orig_handler in listitems(orig_signal_handlers):
1040 signal.signal(sigcode, orig_handler)
1049 if __name__ == '__main__':