1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
30 import arvados.commands._util as arv_cmd
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
35 upload_opts = argparse.ArgumentParser(add_help=False)
37 upload_opts.add_argument('--version', action='version',
38 version="%s %s" % (sys.argv[0], __version__),
39 help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42 Local file or directory. Default: read from standard input.
45 _group = upload_opts.add_mutually_exclusive_group()
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48 default=-1, help=argparse.SUPPRESS)
50 _group.add_argument('--normalize', action='store_true',
52 Normalize the manifest by re-ordering files and streams after writing
56 _group.add_argument('--dry-run', action='store_true', default=False,
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
62 _group = upload_opts.add_mutually_exclusive_group()
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
69 _group.add_argument('--stream', action='store_true',
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78 Synonym for --manifest.
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83 Synonym for --manifest.
86 _group.add_argument('--manifest', action='store_true',
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
98 _group.add_argument('--raw', action='store_true',
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106 dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112 dest='filename', help="""
113 Synonym for --filename.
116 upload_opts.add_argument('--filename', type=str, default=None,
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
142 On high latency installations, using a greater number will improve
146 run_opts = argparse.ArgumentParser(add_help=False)
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
165 _group.add_argument('--no-progress', action='store_true',
167 Do not display human-readable progress on stderr, even if stderr is a
171 _group.add_argument('--batch-progress', action='store_true',
173 Display machine-readable progress on stderr (bytes and, if known,
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
180 Continue interrupted uploads from cached state (default).
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
184 Do not continue interrupted uploads from cached state.
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
190 Save upload state in a cache file for resuming (default).
192 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
194 Do not save upload state in a cache file for resuming.
197 arg_parser = argparse.ArgumentParser(
198 description='Copy data from the local filesystem to Keep.',
199 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
201 def parse_arguments(arguments):
202 args = arg_parser.parse_args(arguments)
204 if len(args.paths) == 0:
207 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
209 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
212 --filename argument cannot be used when storing a directory or
216 # Turn on --progress by default if stderr is a tty.
217 if (not (args.batch_progress or args.no_progress)
218 and os.isatty(sys.stderr.fileno())):
221 # Turn off --resume (default) if --no-cache is used.
222 if not args.use_cache:
225 if args.paths == ['-']:
226 if args.update_collection:
228 --update-collection cannot be used when reading from stdin.
231 args.use_cache = False
232 if not args.filename:
233 args.filename = 'stdin'
238 class CollectionUpdateError(Exception):
242 class ResumeCacheConflict(Exception):
246 class ArvPutArgumentConflict(Exception):
250 class ArvPutUploadIsPending(Exception):
254 class ArvPutUploadNotPending(Exception):
258 class FileUploadList(list):
259 def __init__(self, dry_run=False):
261 self.dry_run = dry_run
263 def append(self, other):
265 raise ArvPutUploadIsPending()
266 super(FileUploadList, self).append(other)
269 class ResumeCache(object):
270 CACHE_DIR = '.cache/arvados/arv-put'
272 def __init__(self, file_spec):
273 self.cache_file = open(file_spec, 'a+')
274 self._lock_file(self.cache_file)
275 self.filename = self.cache_file.name
278 def make_path(cls, args):
280 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
281 realpaths = sorted(os.path.realpath(path) for path in args.paths)
282 md5.update(b'\0'.join([p.encode() for p in realpaths]))
283 if any(os.path.isdir(path) for path in realpaths):
286 md5.update(args.filename.encode())
288 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
291 def _lock_file(self, fileobj):
293 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
295 raise ResumeCacheConflict("{} locked".format(fileobj.name))
298 self.cache_file.seek(0)
299 return json.load(self.cache_file)
301 def check_cache(self, api_client=None, num_retries=0):
306 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
307 locator = state["_finished_streams"][0][1][0]
308 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
309 locator = state["_current_stream_locators"][0]
310 if locator is not None:
311 kc = arvados.keep.KeepClient(api_client=api_client)
312 kc.head(locator, num_retries=num_retries)
313 except Exception as e:
318 def save(self, data):
320 new_cache_fd, new_cache_name = tempfile.mkstemp(
321 dir=os.path.dirname(self.filename))
322 self._lock_file(new_cache_fd)
323 new_cache = os.fdopen(new_cache_fd, 'r+')
324 json.dump(data, new_cache)
325 os.rename(new_cache_name, self.filename)
326 except (IOError, OSError, ResumeCacheConflict) as error:
328 os.unlink(new_cache_name)
329 except NameError: # mkstemp failed.
332 self.cache_file.close()
333 self.cache_file = new_cache
336 self.cache_file.close()
340 os.unlink(self.filename)
341 except OSError as error:
342 if error.errno != errno.ENOENT: # That's what we wanted anyway.
348 self.__init__(self.filename)
351 class ArvPutUploadJob(object):
352 CACHE_DIR = '.cache/arvados/arv-put'
354 'manifest' : None, # Last saved manifest checkpoint
355 'files' : {} # Previous run file list: {path : {size, mtime}}
358 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
359 bytes_expected=None, name=None, owner_uuid=None,
360 ensure_unique_name=False, num_retries=None,
361 put_threads=None, replication_desired=None,
362 filename=None, update_time=60.0, update_collection=None,
363 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
366 self.use_cache = use_cache
368 self.reporter = reporter
369 self.bytes_expected = bytes_expected
370 self.bytes_written = 0
371 self.bytes_skipped = 0
373 self.owner_uuid = owner_uuid
374 self.ensure_unique_name = ensure_unique_name
375 self.num_retries = num_retries
376 self.replication_desired = replication_desired
377 self.put_threads = put_threads
378 self.filename = filename
379 self._state_lock = threading.Lock()
380 self._state = None # Previous run state (file list & manifest)
381 self._current_files = [] # Current run file list
382 self._cache_file = None
383 self._collection_lock = threading.Lock()
384 self._remote_collection = None # Collection being updated (if asked)
385 self._local_collection = None # Collection from previous run manifest
386 self._file_paths = set() # Files to be updated in remote collection
387 self._stop_checkpointer = threading.Event()
388 self._checkpointer = threading.Thread(target=self._update_task)
389 self._checkpointer.daemon = True
390 self._update_task_time = update_time # How many seconds wait between update runs
391 self._files_to_upload = FileUploadList(dry_run=dry_run)
392 self._upload_started = False
394 self.dry_run = dry_run
395 self._checkpoint_before_quit = True
397 if not self.use_cache and self.resume:
398 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
400 # Check for obvious dry-run responses
401 if self.dry_run and (not self.use_cache or not self.resume):
402 raise ArvPutUploadIsPending()
404 # Load cached data if any and if needed
405 self._setup_state(update_collection)
407 def start(self, save_collection):
409 Start supporting thread & file uploading
412 self._checkpointer.start()
414 for path in self.paths:
415 # Test for stdin first, in case some file named '-' exist
418 raise ArvPutUploadIsPending()
419 self._write_stdin(self.filename or 'stdin')
420 elif os.path.isdir(path):
421 # Use absolute paths on cache index so CWD doesn't interfere
422 # with the caching logic.
423 prefixdir = path = os.path.abspath(path)
426 for root, dirs, files in os.walk(path):
427 # Make os.walk()'s dir traversing order deterministic
431 self._check_file(os.path.join(root, f),
432 os.path.join(root[len(prefixdir):], f))
434 self._check_file(os.path.abspath(path),
435 self.filename or os.path.basename(path))
436 # If dry-mode is on, and got up to this point, then we should notify that
437 # there aren't any file to upload.
439 raise ArvPutUploadNotPending()
440 # Remove local_collection's files that don't exist locally anymore, so the
441 # bytes_written count is correct.
442 for f in self.collection_file_paths(self._local_collection,
444 if f != 'stdin' and f != self.filename and not f in self._file_paths:
445 self._local_collection.remove(f)
446 # Update bytes_written from current local collection and
447 # report initial progress.
450 self._upload_started = True # Used by the update thread to start checkpointing
452 except (SystemExit, Exception) as e:
453 self._checkpoint_before_quit = False
454 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
455 # Note: We're expecting SystemExit instead of KeyboardInterrupt because
456 # we have a custom signal handler in place that raises SystemExit with
457 # the catched signal's code.
458 if not isinstance(e, SystemExit) or e.code != -2:
459 self.logger.warning("Abnormal termination:\n{}".format(
460 traceback.format_exc()))
464 # Stop the thread before doing anything else
465 self._stop_checkpointer.set()
466 self._checkpointer.join()
467 if self._checkpoint_before_quit:
468 # Commit all pending blocks & one last _update()
469 self._local_collection.manifest_text()
470 self._update(final=True)
472 self.save_collection()
474 self._cache_file.close()
476 def save_collection(self):
478 # Check if files should be updated on the remote collection.
479 for fp in self._file_paths:
480 remote_file = self._remote_collection.find(fp)
482 # File don't exist on remote collection, copy it.
483 self._remote_collection.copy(fp, fp, self._local_collection)
484 elif remote_file != self._local_collection.find(fp):
485 # A different file exist on remote collection, overwrite it.
486 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
488 # The file already exist on remote collection, skip it.
490 self._remote_collection.save(num_retries=self.num_retries)
492 self._local_collection.save_new(
493 name=self.name, owner_uuid=self.owner_uuid,
494 ensure_unique_name=self.ensure_unique_name,
495 num_retries=self.num_retries)
497 def destroy_cache(self):
500 os.unlink(self._cache_filename)
501 except OSError as error:
502 # That's what we wanted anyway.
503 if error.errno != errno.ENOENT:
505 self._cache_file.close()
507 def _collection_size(self, collection):
509 Recursively get the total size of the collection
512 for item in listvalues(collection):
513 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514 size += self._collection_size(item)
519 def _update_task(self):
521 Periodically called support task. File uploading is
522 asynchronous so we poll status from the collection.
524 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
527 def _update(self, final=False):
529 Update cached manifest text and report progress.
531 if self._upload_started:
532 with self._collection_lock:
533 self.bytes_written = self._collection_size(self._local_collection)
536 manifest = self._local_collection.manifest_text()
538 # Get the manifest text without comitting pending blocks
539 manifest = self._local_collection.manifest_text(strip=False,
543 with self._state_lock:
544 self._state['manifest'] = manifest
548 self.bytes_written = self.bytes_skipped
549 # Call the reporter, if any
550 self.report_progress()
552 def report_progress(self):
553 if self.reporter is not None:
554 self.reporter(self.bytes_written, self.bytes_expected)
556 def _write_stdin(self, filename):
557 output = self._local_collection.open(filename, 'wb')
558 self._write(sys.stdin, output)
561 def _check_file(self, source, filename):
562 """Check if this file needs to be uploaded"""
564 should_upload = False
565 new_file_in_cache = False
566 # Record file path for updating the remote collection before exiting
567 self._file_paths.add(filename)
569 with self._state_lock:
570 # If no previous cached data on this file, store it for an eventual
572 if source not in self._state['files']:
573 self._state['files'][source] = {
574 'mtime': os.path.getmtime(source),
575 'size' : os.path.getsize(source)
577 new_file_in_cache = True
578 cached_file_data = self._state['files'][source]
580 # Check if file was already uploaded (at least partially)
581 file_in_local_collection = self._local_collection.find(filename)
583 # If not resuming, upload the full file.
586 # New file detected from last run, upload it.
587 elif new_file_in_cache:
589 # Local file didn't change from last run.
590 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
591 if not file_in_local_collection:
592 # File not uploaded yet, upload it completely
594 elif file_in_local_collection.permission_expired():
595 # Permission token expired, re-upload file. This will change whenever
596 # we have a API for refreshing tokens.
598 self._local_collection.remove(filename)
599 elif cached_file_data['size'] == file_in_local_collection.size():
600 # File already there, skip it.
601 self.bytes_skipped += cached_file_data['size']
602 elif cached_file_data['size'] > file_in_local_collection.size():
603 # File partially uploaded, resume!
604 resume_offset = file_in_local_collection.size()
605 self.bytes_skipped += resume_offset
608 # Inconsistent cache, re-upload the file
610 self._local_collection.remove(filename)
611 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
612 # Local file differs from cached data, re-upload it.
614 if file_in_local_collection:
615 self._local_collection.remove(filename)
619 self._files_to_upload.append((source, resume_offset, filename))
621 def _upload_files(self):
622 for source, resume_offset, filename in self._files_to_upload:
623 with open(source, 'rb') as source_fd:
624 with self._state_lock:
625 self._state['files'][source]['mtime'] = os.path.getmtime(source)
626 self._state['files'][source]['size'] = os.path.getsize(source)
627 if resume_offset > 0:
628 # Start upload where we left off
629 output = self._local_collection.open(filename, 'ab')
630 source_fd.seek(resume_offset)
633 output = self._local_collection.open(filename, 'wb')
634 self._write(source_fd, output)
635 output.close(flush=False)
637 def _write(self, source_fd, output):
639 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
644 def _my_collection(self):
645 return self._remote_collection if self.update else self._local_collection
647 def _setup_state(self, update_collection):
649 Create a new cache file or load a previously existing one.
651 # Load an already existing collection for update
652 if update_collection and re.match(arvados.util.collection_uuid_pattern,
655 self._remote_collection = arvados.collection.Collection(update_collection)
656 except arvados.errors.ApiError as error:
657 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
660 elif update_collection:
661 # Collection locator provided, but unknown format
662 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
665 # Set up cache file name from input paths.
667 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
668 realpaths = sorted(os.path.realpath(path) for path in self.paths)
669 md5.update(b'\0'.join([p.encode() for p in realpaths]))
671 md5.update(self.filename.encode())
672 cache_filename = md5.hexdigest()
673 cache_filepath = os.path.join(
674 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
677 self._cache_file = open(cache_filepath, 'a+')
679 # --no-resume means start with a empty cache file.
680 self._cache_file = open(cache_filepath, 'w+')
681 self._cache_filename = self._cache_file.name
682 self._lock_file(self._cache_file)
683 self._cache_file.seek(0)
685 with self._state_lock:
688 self._state = json.load(self._cache_file)
689 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
690 # Cache at least partially incomplete, set up new cache
691 self._state = copy.deepcopy(self.EMPTY_STATE)
693 # Cache file empty, set up new cache
694 self._state = copy.deepcopy(self.EMPTY_STATE)
696 # No cache file, set empty state
697 self._state = copy.deepcopy(self.EMPTY_STATE)
698 # Load the previous manifest so we can check if files were modified remotely.
699 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
701 def collection_file_paths(self, col, path_prefix='.'):
702 """Return a list of file paths by recursively go through the entire collection `col`"""
704 for name, item in listitems(col):
705 if isinstance(item, arvados.arvfile.ArvadosFile):
706 file_paths.append(os.path.join(path_prefix, name))
707 elif isinstance(item, arvados.collection.Subcollection):
708 new_prefix = os.path.join(path_prefix, name)
709 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
712 def _lock_file(self, fileobj):
714 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
716 raise ResumeCacheConflict("{} locked".format(fileobj.name))
718 def _save_state(self):
720 Atomically save current state into cache.
723 with self._state_lock:
724 # We're not using copy.deepcopy() here because it's a lot slower
725 # than json.dumps(), and we're already needing JSON format to be
727 state = json.dumps(self._state)
728 new_cache_fd, new_cache_name = tempfile.mkstemp(
729 dir=os.path.dirname(self._cache_filename))
730 self._lock_file(new_cache_fd)
731 new_cache = os.fdopen(new_cache_fd, 'r+')
732 new_cache.write(state)
735 os.rename(new_cache_name, self._cache_filename)
736 except (IOError, OSError, ResumeCacheConflict) as error:
737 self.logger.error("There was a problem while saving the cache file: {}".format(error))
739 os.unlink(new_cache_name)
740 except NameError: # mkstemp failed.
743 self._cache_file.close()
744 self._cache_file = new_cache
746 def collection_name(self):
747 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
749 def manifest_locator(self):
750 return self._my_collection().manifest_locator()
752 def portable_data_hash(self):
753 pdh = self._my_collection().portable_data_hash()
754 m = self._my_collection().stripped_manifest().encode()
755 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
757 logger.warning("\n".join([
758 "arv-put: API server provided PDH differs from local manifest.",
759 " This should not happen; showing API server version."]))
762 def manifest_text(self, stream_name=".", strip=False, normalize=False):
763 return self._my_collection().manifest_text(stream_name, strip, normalize)
765 def _datablocks_on_item(self, item):
767 Return a list of datablock locators, recursively navigating
768 through subcollections
770 if isinstance(item, arvados.arvfile.ArvadosFile):
773 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
776 for segment in item.segments():
777 loc = segment.locator
780 elif isinstance(item, arvados.collection.Collection):
781 l = [self._datablocks_on_item(x) for x in listvalues(item)]
782 # Fast list flattener method taken from:
783 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
784 return [loc for sublist in l for loc in sublist]
788 def data_locators(self):
789 with self._collection_lock:
790 # Make sure all datablocks are flushed before getting the locators
791 self._my_collection().manifest_text()
792 datablocks = self._datablocks_on_item(self._my_collection())
796 def expected_bytes_for(pathlist):
797 # Walk the given directory trees and stat files, adding up file sizes,
798 # so we can display progress as percent
800 for path in pathlist:
801 if os.path.isdir(path):
802 for filename in arvados.util.listdir_recursive(path):
803 bytesum += os.path.getsize(os.path.join(path, filename))
804 elif not os.path.isfile(path):
807 bytesum += os.path.getsize(path)
810 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
812 def machine_progress(bytes_written, bytes_expected):
813 return _machine_format.format(
814 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
816 def human_progress(bytes_written, bytes_expected):
818 return "\r{}M / {}M {:.1%} ".format(
819 bytes_written >> 20, bytes_expected >> 20,
820 float(bytes_written) / bytes_expected)
822 return "\r{} ".format(bytes_written)
824 def progress_writer(progress_func, outfile=sys.stderr):
825 def write_progress(bytes_written, bytes_expected):
826 outfile.write(progress_func(bytes_written, bytes_expected))
827 return write_progress
829 def exit_signal_handler(sigcode, frame):
832 def desired_project_uuid(api_client, project_uuid, num_retries):
834 query = api_client.users().current()
835 elif arvados.util.user_uuid_pattern.match(project_uuid):
836 query = api_client.users().get(uuid=project_uuid)
837 elif arvados.util.group_uuid_pattern.match(project_uuid):
838 query = api_client.groups().get(uuid=project_uuid)
840 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
841 return query.execute(num_retries=num_retries)['uuid']
843 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
846 logger = logging.getLogger('arvados.arv_put')
847 logger.setLevel(logging.INFO)
848 args = parse_arguments(arguments)
850 if api_client is None:
851 api_client = arvados.api('v1')
853 # Determine the name to use
855 if args.stream or args.raw:
856 logger.error("Cannot use --name with --stream or --raw")
858 elif args.update_collection:
859 logger.error("Cannot use --name with --update-collection")
861 collection_name = args.name
863 collection_name = "Saved at {} by {}@{}".format(
864 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
865 pwd.getpwuid(os.getuid()).pw_name,
866 socket.gethostname())
868 if args.project_uuid and (args.stream or args.raw):
869 logger.error("Cannot use --project-uuid with --stream or --raw")
872 # Determine the parent project
874 project_uuid = desired_project_uuid(api_client, args.project_uuid,
876 except (apiclient_errors.Error, ValueError) as error:
881 reporter = progress_writer(human_progress)
882 elif args.batch_progress:
883 reporter = progress_writer(machine_progress)
887 # If this is used by a human, and there's at least one directory to be
888 # uploaded, the expected bytes calculation can take a moment.
889 if args.progress and any([os.path.isdir(f) for f in args.paths]):
890 logger.info("Calculating upload size, this could take some time...")
891 bytes_expected = expected_bytes_for(args.paths)
894 writer = ArvPutUploadJob(paths = args.paths,
895 resume = args.resume,
896 use_cache = args.use_cache,
897 filename = args.filename,
899 bytes_expected = bytes_expected,
900 num_retries = args.retries,
901 replication_desired = args.replication,
902 put_threads = args.threads,
903 name = collection_name,
904 owner_uuid = project_uuid,
905 ensure_unique_name = True,
906 update_collection = args.update_collection,
908 dry_run=args.dry_run)
909 except ResumeCacheConflict:
910 logger.error("\n".join([
911 "arv-put: Another process is already uploading this data.",
912 " Use --no-cache if this is really what you want."]))
914 except CollectionUpdateError as error:
915 logger.error("\n".join([
916 "arv-put: %s" % str(error)]))
918 except ArvPutUploadIsPending:
919 # Dry run check successful, return proper exit code.
921 except ArvPutUploadNotPending:
922 # No files pending for upload
925 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
927 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
928 for sigcode in CAUGHT_SIGNALS}
930 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
931 logger.warning("\n".join([
932 "arv-put: Resuming previous upload from last checkpoint.",
933 " Use the --no-resume option to start over."]))
936 writer.report_progress()
939 writer.start(save_collection=not(args.stream or args.raw))
940 except arvados.errors.ApiError as error:
941 logger.error("\n".join([
942 "arv-put: %s" % str(error)]))
944 except ArvPutUploadIsPending:
945 # Dry run check successful, return proper exit code.
947 except ArvPutUploadNotPending:
948 # No files pending for upload
951 if args.progress: # Print newline to split stderr from stdout for humans.
956 output = writer.manifest_text(normalize=True)
958 output = writer.manifest_text()
960 output = ','.join(writer.data_locators())
963 if args.update_collection:
964 logger.info("Collection updated: '{}'".format(writer.collection_name()))
966 logger.info("Collection saved as '{}'".format(writer.collection_name()))
967 if args.portable_data_hash:
968 output = writer.portable_data_hash()
970 output = writer.manifest_locator()
971 except apiclient_errors.Error as error:
973 "arv-put: Error creating Collection on project: {}.".format(
977 # Print the locator (uuid) of the new collection.
982 if not output.endswith('\n'):
985 for sigcode, orig_handler in listitems(orig_signal_handlers):
986 signal.signal(sigcode, orig_handler)
995 if __name__ == '__main__':