1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
48 _group = upload_opts.add_mutually_exclusive_group()
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51 default=-1, help=argparse.SUPPRESS)
53 _group.add_argument('--normalize', action='store_true',
55 Normalize the manifest by re-ordering files and streams after writing
59 _group.add_argument('--dry-run', action='store_true', default=False,
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
65 _group = upload_opts.add_mutually_exclusive_group()
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
72 _group.add_argument('--stream', action='store_true',
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
81 Synonym for --manifest.
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
86 Synonym for --manifest.
89 _group.add_argument('--manifest', action='store_true',
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
101 _group.add_argument('--raw', action='store_true',
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109 dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115 dest='filename', help="""
116 Synonym for --filename.
119 upload_opts.add_argument('--filename', type=str, default=None,
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
145 On high latency installations, using a greater number will improve
149 run_opts = argparse.ArgumentParser(add_help=False)
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161 action='append', help="""
162 Exclude files and directories whose names match the given pattern. You
163 can specify multiple patterns by using this argument more than once.
166 _group = run_opts.add_mutually_exclusive_group()
167 _group.add_argument('--progress', action='store_true',
169 Display human-readable progress on stderr (bytes and, if possible,
170 percentage of total data size). This is the default behavior when
174 _group.add_argument('--no-progress', action='store_true',
176 Do not display human-readable progress on stderr, even if stderr is a
180 _group.add_argument('--batch-progress', action='store_true',
182 Display machine-readable progress on stderr (bytes and, if known,
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--resume', action='store_true', default=True,
189 Continue interrupted uploads from cached state (default).
191 _group.add_argument('--no-resume', action='store_false', dest='resume',
193 Do not continue interrupted uploads from cached state.
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--follow-links', action='store_true', default=True,
198 dest='follow_links', help="""
199 Follow file and directory symlinks (default).
201 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
203 Do not follow file and directory symlinks.
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
209 Save upload state in a cache file for resuming (default).
211 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
213 Do not save upload state in a cache file for resuming.
216 arg_parser = argparse.ArgumentParser(
217 description='Copy data from the local filesystem to Keep.',
218 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
220 def parse_arguments(arguments):
221 args = arg_parser.parse_args(arguments)
223 if len(args.paths) == 0:
226 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
228 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
231 --filename argument cannot be used when storing a directory or
235 # Turn on --progress by default if stderr is a tty.
236 if (not (args.batch_progress or args.no_progress)
237 and os.isatty(sys.stderr.fileno())):
240 # Turn off --resume (default) if --no-cache is used.
241 if not args.use_cache:
244 if args.paths == ['-']:
245 if args.update_collection:
247 --update-collection cannot be used when reading from stdin.
250 args.use_cache = False
251 if not args.filename:
252 args.filename = 'stdin'
254 # Remove possible duplicated patterns
255 if len(args.exclude) > 0:
256 args.exclude = list(set(args.exclude))
261 class PathDoesNotExistError(Exception):
265 class CollectionUpdateError(Exception):
269 class ResumeCacheConflict(Exception):
273 class ArvPutArgumentConflict(Exception):
277 class ArvPutUploadIsPending(Exception):
281 class ArvPutUploadNotPending(Exception):
285 class FileUploadList(list):
286 def __init__(self, dry_run=False):
288 self.dry_run = dry_run
290 def append(self, other):
292 raise ArvPutUploadIsPending()
293 super(FileUploadList, self).append(other)
296 class ResumeCache(object):
297 CACHE_DIR = '.cache/arvados/arv-put'
299 def __init__(self, file_spec):
300 self.cache_file = open(file_spec, 'a+')
301 self._lock_file(self.cache_file)
302 self.filename = self.cache_file.name
305 def make_path(cls, args):
307 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
308 realpaths = sorted(os.path.realpath(path) for path in args.paths)
309 md5.update(b'\0'.join([p.encode() for p in realpaths]))
310 if any(os.path.isdir(path) for path in realpaths):
313 md5.update(args.filename.encode())
315 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
318 def _lock_file(self, fileobj):
320 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
322 raise ResumeCacheConflict("{} locked".format(fileobj.name))
325 self.cache_file.seek(0)
326 return json.load(self.cache_file)
328 def check_cache(self, api_client=None, num_retries=0):
333 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
334 locator = state["_finished_streams"][0][1][0]
335 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
336 locator = state["_current_stream_locators"][0]
337 if locator is not None:
338 kc = arvados.keep.KeepClient(api_client=api_client)
339 kc.head(locator, num_retries=num_retries)
340 except Exception as e:
345 def save(self, data):
347 new_cache_fd, new_cache_name = tempfile.mkstemp(
348 dir=os.path.dirname(self.filename))
349 self._lock_file(new_cache_fd)
350 new_cache = os.fdopen(new_cache_fd, 'r+')
351 json.dump(data, new_cache)
352 os.rename(new_cache_name, self.filename)
353 except (IOError, OSError, ResumeCacheConflict) as error:
355 os.unlink(new_cache_name)
356 except NameError: # mkstemp failed.
359 self.cache_file.close()
360 self.cache_file = new_cache
363 self.cache_file.close()
367 os.unlink(self.filename)
368 except OSError as error:
369 if error.errno != errno.ENOENT: # That's what we wanted anyway.
375 self.__init__(self.filename)
378 class ArvPutUploadJob(object):
379 CACHE_DIR = '.cache/arvados/arv-put'
381 'manifest' : None, # Last saved manifest checkpoint
382 'files' : {} # Previous run file list: {path : {size, mtime}}
385 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
386 name=None, owner_uuid=None,
387 ensure_unique_name=False, num_retries=None,
388 put_threads=None, replication_desired=None,
389 filename=None, update_time=60.0, update_collection=None,
390 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
391 follow_links=True, exclude_paths=[], exclude_names=None):
394 self.use_cache = use_cache
396 self.reporter = reporter
397 # This will set to 0 before start counting, if no special files are going
399 self.bytes_expected = None
400 self.bytes_written = 0
401 self.bytes_skipped = 0
403 self.owner_uuid = owner_uuid
404 self.ensure_unique_name = ensure_unique_name
405 self.num_retries = num_retries
406 self.replication_desired = replication_desired
407 self.put_threads = put_threads
408 self.filename = filename
409 self._state_lock = threading.Lock()
410 self._state = None # Previous run state (file list & manifest)
411 self._current_files = [] # Current run file list
412 self._cache_file = None
413 self._collection_lock = threading.Lock()
414 self._remote_collection = None # Collection being updated (if asked)
415 self._local_collection = None # Collection from previous run manifest
416 self._file_paths = set() # Files to be updated in remote collection
417 self._stop_checkpointer = threading.Event()
418 self._checkpointer = threading.Thread(target=self._update_task)
419 self._checkpointer.daemon = True
420 self._update_task_time = update_time # How many seconds wait between update runs
421 self._files_to_upload = FileUploadList(dry_run=dry_run)
422 self._upload_started = False
424 self.dry_run = dry_run
425 self._checkpoint_before_quit = True
426 self.follow_links = follow_links
427 self.exclude_paths = exclude_paths
428 self.exclude_names = exclude_names
430 if not self.use_cache and self.resume:
431 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
433 # Check for obvious dry-run responses
434 if self.dry_run and (not self.use_cache or not self.resume):
435 raise ArvPutUploadIsPending()
437 # Load cached data if any and if needed
438 self._setup_state(update_collection)
440 # Build the upload file list, excluding requested files and counting the
441 # bytes expected to be uploaded.
442 self._build_upload_list()
444 def _build_upload_list(self):
446 Scan the requested paths to count file sizes, excluding files & dirs if requested
447 and building the upload file list.
449 # If there aren't special files to be read, reset total bytes count to zero
451 if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
453 self.bytes_expected = 0
455 for path in self.paths:
456 # Test for stdin first, in case some file named '-' exist
459 raise ArvPutUploadIsPending()
460 self._write_stdin(self.filename or 'stdin')
461 elif not os.path.exists(path):
462 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
463 elif os.path.isdir(path):
464 # Use absolute paths on cache index so CWD doesn't interfere
465 # with the caching logic.
467 path = os.path.abspath(path)
468 if orig_path[-1:] == os.sep:
469 # When passing a directory reference with a trailing slash,
470 # its contents should be uploaded directly to the collection's root.
473 # When passing a directory reference with no trailing slash,
474 # upload the directory to the collection's root.
475 prefixdir = os.path.dirname(path)
477 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
478 root_relpath = os.path.relpath(root, path)
479 # Exclude files/dirs by full path matching pattern
480 if self.exclude_paths:
482 lambda d: not any([pathname_match(os.path.join(root_relpath, d),
484 for pat in self.exclude_paths]),
487 lambda f: not any([pathname_match(os.path.join(root_relpath, f),
489 for pat in self.exclude_paths]),
491 # Exclude files/dirs by name matching pattern
492 if self.exclude_names is not None:
493 dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
494 files = filter(lambda f: not self.exclude_names.match(f), files)
495 # Make os.walk()'s dir traversing order deterministic
499 filepath = os.path.join(root, f)
500 # Add its size to the total bytes count (if applicable)
501 if self.follow_links or (not os.path.islink(filepath)):
502 if self.bytes_expected is not None:
503 self.bytes_expected += os.path.getsize(filepath)
504 self._check_file(filepath,
505 os.path.join(root[len(prefixdir):], f))
507 filepath = os.path.abspath(path)
508 # Add its size to the total bytes count (if applicable)
509 if self.follow_links or (not os.path.islink(filepath)):
510 if self.bytes_expected is not None:
511 self.bytes_expected += os.path.getsize(filepath)
512 self._check_file(filepath,
513 self.filename or os.path.basename(path))
514 # If dry-mode is on, and got up to this point, then we should notify that
515 # there aren't any file to upload.
517 raise ArvPutUploadNotPending()
518 # Remove local_collection's files that don't exist locally anymore, so the
519 # bytes_written count is correct.
520 for f in self.collection_file_paths(self._local_collection,
522 if f != 'stdin' and f != self.filename and not f in self._file_paths:
523 self._local_collection.remove(f)
525 def start(self, save_collection):
527 Start supporting thread & file uploading
529 self._checkpointer.start()
531 # Update bytes_written from current local collection and
532 # report initial progress.
535 self._upload_started = True # Used by the update thread to start checkpointing
537 except (SystemExit, Exception) as e:
538 self._checkpoint_before_quit = False
539 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
540 # Note: We're expecting SystemExit instead of
541 # KeyboardInterrupt because we have a custom signal
542 # handler in place that raises SystemExit with the catched
544 if isinstance(e, PathDoesNotExistError):
545 # We aren't interested in the traceback for this case
547 elif not isinstance(e, SystemExit) or e.code != -2:
548 self.logger.warning("Abnormal termination:\n{}".format(
549 traceback.format_exc()))
553 # Stop the thread before doing anything else
554 self._stop_checkpointer.set()
555 self._checkpointer.join()
556 if self._checkpoint_before_quit:
557 # Commit all pending blocks & one last _update()
558 self._local_collection.manifest_text()
559 self._update(final=True)
561 self.save_collection()
563 self._cache_file.close()
565 def save_collection(self):
567 # Check if files should be updated on the remote collection.
568 for fp in self._file_paths:
569 remote_file = self._remote_collection.find(fp)
571 # File don't exist on remote collection, copy it.
572 self._remote_collection.copy(fp, fp, self._local_collection)
573 elif remote_file != self._local_collection.find(fp):
574 # A different file exist on remote collection, overwrite it.
575 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
577 # The file already exist on remote collection, skip it.
579 self._remote_collection.save(num_retries=self.num_retries)
581 self._local_collection.save_new(
582 name=self.name, owner_uuid=self.owner_uuid,
583 ensure_unique_name=self.ensure_unique_name,
584 num_retries=self.num_retries)
586 def destroy_cache(self):
589 os.unlink(self._cache_filename)
590 except OSError as error:
591 # That's what we wanted anyway.
592 if error.errno != errno.ENOENT:
594 self._cache_file.close()
596 def _collection_size(self, collection):
598 Recursively get the total size of the collection
601 for item in listvalues(collection):
602 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
603 size += self._collection_size(item)
608 def _update_task(self):
610 Periodically called support task. File uploading is
611 asynchronous so we poll status from the collection.
613 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
616 def _update(self, final=False):
618 Update cached manifest text and report progress.
620 if self._upload_started:
621 with self._collection_lock:
622 self.bytes_written = self._collection_size(self._local_collection)
625 manifest = self._local_collection.manifest_text()
627 # Get the manifest text without comitting pending blocks
628 manifest = self._local_collection.manifest_text(strip=False,
632 with self._state_lock:
633 self._state['manifest'] = manifest
637 except Exception as e:
638 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
640 self.bytes_written = self.bytes_skipped
641 # Call the reporter, if any
642 self.report_progress()
644 def report_progress(self):
645 if self.reporter is not None:
646 self.reporter(self.bytes_written, self.bytes_expected)
648 def _write_stdin(self, filename):
649 output = self._local_collection.open(filename, 'wb')
650 self._write(sys.stdin, output)
653 def _check_file(self, source, filename):
655 Check if this file needs to be uploaded
657 # Ignore symlinks when requested
658 if (not self.follow_links) and os.path.islink(source):
661 should_upload = False
662 new_file_in_cache = False
663 # Record file path for updating the remote collection before exiting
664 self._file_paths.add(filename)
666 with self._state_lock:
667 # If no previous cached data on this file, store it for an eventual
669 if source not in self._state['files']:
670 self._state['files'][source] = {
671 'mtime': os.path.getmtime(source),
672 'size' : os.path.getsize(source)
674 new_file_in_cache = True
675 cached_file_data = self._state['files'][source]
677 # Check if file was already uploaded (at least partially)
678 file_in_local_collection = self._local_collection.find(filename)
680 # If not resuming, upload the full file.
683 # New file detected from last run, upload it.
684 elif new_file_in_cache:
686 # Local file didn't change from last run.
687 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
688 if not file_in_local_collection:
689 # File not uploaded yet, upload it completely
691 elif file_in_local_collection.permission_expired():
692 # Permission token expired, re-upload file. This will change whenever
693 # we have a API for refreshing tokens.
695 self._local_collection.remove(filename)
696 elif cached_file_data['size'] == file_in_local_collection.size():
697 # File already there, skip it.
698 self.bytes_skipped += cached_file_data['size']
699 elif cached_file_data['size'] > file_in_local_collection.size():
700 # File partially uploaded, resume!
701 resume_offset = file_in_local_collection.size()
702 self.bytes_skipped += resume_offset
705 # Inconsistent cache, re-upload the file
707 self._local_collection.remove(filename)
708 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
709 # Local file differs from cached data, re-upload it.
711 if file_in_local_collection:
712 self._local_collection.remove(filename)
717 self._files_to_upload.append((source, resume_offset, filename))
718 except ArvPutUploadIsPending:
719 # This could happen when running on dry-mode, close cache file to
720 # avoid locking issues.
721 self._cache_file.close()
724 def _upload_files(self):
725 for source, resume_offset, filename in self._files_to_upload:
726 with open(source, 'rb') as source_fd:
727 with self._state_lock:
728 self._state['files'][source]['mtime'] = os.path.getmtime(source)
729 self._state['files'][source]['size'] = os.path.getsize(source)
730 if resume_offset > 0:
731 # Start upload where we left off
732 output = self._local_collection.open(filename, 'ab')
733 source_fd.seek(resume_offset)
736 output = self._local_collection.open(filename, 'wb')
737 self._write(source_fd, output)
738 output.close(flush=False)
740 def _write(self, source_fd, output):
742 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
747 def _my_collection(self):
748 return self._remote_collection if self.update else self._local_collection
750 def _setup_state(self, update_collection):
752 Create a new cache file or load a previously existing one.
754 # Load an already existing collection for update
755 if update_collection and re.match(arvados.util.collection_uuid_pattern,
758 self._remote_collection = arvados.collection.Collection(update_collection)
759 except arvados.errors.ApiError as error:
760 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
763 elif update_collection:
764 # Collection locator provided, but unknown format
765 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
768 # Set up cache file name from input paths.
770 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
771 realpaths = sorted(os.path.realpath(path) for path in self.paths)
772 md5.update(b'\0'.join([p.encode() for p in realpaths]))
774 md5.update(self.filename.encode())
775 cache_filename = md5.hexdigest()
776 cache_filepath = os.path.join(
777 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
779 if self.resume and os.path.exists(cache_filepath):
780 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
781 self._cache_file = open(cache_filepath, 'a+')
783 # --no-resume means start with a empty cache file.
784 self.logger.info("Creating new cache file at {}".format(cache_filepath))
785 self._cache_file = open(cache_filepath, 'w+')
786 self._cache_filename = self._cache_file.name
787 self._lock_file(self._cache_file)
788 self._cache_file.seek(0)
790 with self._state_lock:
793 self._state = json.load(self._cache_file)
794 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
795 # Cache at least partially incomplete, set up new cache
796 self._state = copy.deepcopy(self.EMPTY_STATE)
798 # Cache file empty, set up new cache
799 self._state = copy.deepcopy(self.EMPTY_STATE)
801 self.logger.info("No cache usage requested for this run.")
802 # No cache file, set empty state
803 self._state = copy.deepcopy(self.EMPTY_STATE)
804 # Load the previous manifest so we can check if files were modified remotely.
805 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
807 def collection_file_paths(self, col, path_prefix='.'):
808 """Return a list of file paths by recursively go through the entire collection `col`"""
810 for name, item in listitems(col):
811 if isinstance(item, arvados.arvfile.ArvadosFile):
812 file_paths.append(os.path.join(path_prefix, name))
813 elif isinstance(item, arvados.collection.Subcollection):
814 new_prefix = os.path.join(path_prefix, name)
815 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
818 def _lock_file(self, fileobj):
820 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
822 raise ResumeCacheConflict("{} locked".format(fileobj.name))
824 def _save_state(self):
826 Atomically save current state into cache.
828 with self._state_lock:
829 # We're not using copy.deepcopy() here because it's a lot slower
830 # than json.dumps(), and we're already needing JSON format to be
832 state = json.dumps(self._state)
834 new_cache = tempfile.NamedTemporaryFile(
836 dir=os.path.dirname(self._cache_filename), delete=False)
837 self._lock_file(new_cache)
838 new_cache.write(state)
841 os.rename(new_cache.name, self._cache_filename)
842 except (IOError, OSError, ResumeCacheConflict) as error:
843 self.logger.error("There was a problem while saving the cache file: {}".format(error))
845 os.unlink(new_cache_name)
846 except NameError: # mkstemp failed.
849 self._cache_file.close()
850 self._cache_file = new_cache
852 def collection_name(self):
853 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
855 def manifest_locator(self):
856 return self._my_collection().manifest_locator()
858 def portable_data_hash(self):
859 pdh = self._my_collection().portable_data_hash()
860 m = self._my_collection().stripped_manifest().encode()
861 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
863 logger.warning("\n".join([
864 "arv-put: API server provided PDH differs from local manifest.",
865 " This should not happen; showing API server version."]))
868 def manifest_text(self, stream_name=".", strip=False, normalize=False):
869 return self._my_collection().manifest_text(stream_name, strip, normalize)
871 def _datablocks_on_item(self, item):
873 Return a list of datablock locators, recursively navigating
874 through subcollections
876 if isinstance(item, arvados.arvfile.ArvadosFile):
879 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
882 for segment in item.segments():
883 loc = segment.locator
886 elif isinstance(item, arvados.collection.Collection):
887 l = [self._datablocks_on_item(x) for x in listvalues(item)]
888 # Fast list flattener method taken from:
889 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
890 return [loc for sublist in l for loc in sublist]
894 def data_locators(self):
895 with self._collection_lock:
896 # Make sure all datablocks are flushed before getting the locators
897 self._my_collection().manifest_text()
898 datablocks = self._datablocks_on_item(self._my_collection())
901 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
904 # Simulate glob.glob() matching behavior without the need to scan the filesystem
905 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
906 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
907 # so instead we're using it on every path component.
908 def pathname_match(pathname, pattern):
909 name = pathname.split(os.sep)
910 # Fix patterns like 'some/subdir/' or 'some//subdir'
911 pat = [x for x in pattern.split(os.sep) if x != '']
912 if len(name) != len(pat):
914 for i in range(len(name)):
915 if not fnmatch.fnmatch(name[i], pat[i]):
919 def machine_progress(bytes_written, bytes_expected):
920 return _machine_format.format(
921 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
923 def human_progress(bytes_written, bytes_expected):
925 return "\r{}M / {}M {:.1%} ".format(
926 bytes_written >> 20, bytes_expected >> 20,
927 float(bytes_written) / bytes_expected)
929 return "\r{} ".format(bytes_written)
931 def progress_writer(progress_func, outfile=sys.stderr):
932 def write_progress(bytes_written, bytes_expected):
933 outfile.write(progress_func(bytes_written, bytes_expected))
934 return write_progress
936 def exit_signal_handler(sigcode, frame):
939 def desired_project_uuid(api_client, project_uuid, num_retries):
941 query = api_client.users().current()
942 elif arvados.util.user_uuid_pattern.match(project_uuid):
943 query = api_client.users().get(uuid=project_uuid)
944 elif arvados.util.group_uuid_pattern.match(project_uuid):
945 query = api_client.groups().get(uuid=project_uuid)
947 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
948 return query.execute(num_retries=num_retries)['uuid']
950 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
953 logger = logging.getLogger('arvados.arv_put')
954 logger.setLevel(logging.INFO)
955 args = parse_arguments(arguments)
957 if api_client is None:
958 api_client = arvados.api('v1')
960 # Determine the name to use
962 if args.stream or args.raw:
963 logger.error("Cannot use --name with --stream or --raw")
965 elif args.update_collection:
966 logger.error("Cannot use --name with --update-collection")
968 collection_name = args.name
970 collection_name = "Saved at {} by {}@{}".format(
971 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
972 pwd.getpwuid(os.getuid()).pw_name,
973 socket.gethostname())
975 if args.project_uuid and (args.stream or args.raw):
976 logger.error("Cannot use --project-uuid with --stream or --raw")
979 # Determine the parent project
981 project_uuid = desired_project_uuid(api_client, args.project_uuid,
983 except (apiclient_errors.Error, ValueError) as error:
988 reporter = progress_writer(human_progress)
989 elif args.batch_progress:
990 reporter = progress_writer(machine_progress)
994 # Setup exclude regex from all the --exclude arguments provided
998 if len(args.exclude) > 0:
999 # We're supporting 2 kinds of exclusion patterns:
1000 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1002 # 2) --exclude 'foo/bar' (file/dir path patterns, will match the
1003 # entire path, and should be relative to
1004 # any input dir argument)
1005 for p in args.exclude:
1006 # Only relative paths patterns allowed
1007 if p.startswith(os.sep):
1008 logger.error("Cannot use absolute paths with --exclude")
1010 if os.path.dirname(p):
1011 # We don't support of path patterns with '.' or '..'
1012 p_parts = p.split(os.sep)
1013 if '.' in p_parts or '..' in p_parts:
1015 "Cannot use path patterns that include '.' or '..")
1017 # Path search pattern
1018 exclude_paths.append(p)
1020 # Name-only search pattern
1021 name_patterns.append(p)
1022 # For name only matching, we can combine all patterns into a single regexp,
1023 # for better performance.
1024 exclude_names = re.compile('|'.join(
1025 [fnmatch.translate(p) for p in name_patterns]
1026 )) if len(name_patterns) > 0 else None
1027 # Show the user the patterns to be used, just in case they weren't specified inside
1028 # quotes and got changed by the shell expansion.
1029 logger.info("Exclude patterns: {}".format(args.exclude))
1031 # If this is used by a human, and there's at least one directory to be
1032 # uploaded, the expected bytes calculation can take a moment.
1033 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1034 logger.info("Calculating upload size, this could take some time...")
1036 writer = ArvPutUploadJob(paths = args.paths,
1037 resume = args.resume,
1038 use_cache = args.use_cache,
1039 filename = args.filename,
1040 reporter = reporter,
1041 num_retries = args.retries,
1042 replication_desired = args.replication,
1043 put_threads = args.threads,
1044 name = collection_name,
1045 owner_uuid = project_uuid,
1046 ensure_unique_name = True,
1047 update_collection = args.update_collection,
1049 dry_run=args.dry_run,
1050 follow_links=args.follow_links,
1051 exclude_paths=exclude_paths,
1052 exclude_names=exclude_names)
1053 except ResumeCacheConflict:
1054 logger.error("\n".join([
1055 "arv-put: Another process is already uploading this data.",
1056 " Use --no-cache if this is really what you want."]))
1058 except CollectionUpdateError as error:
1059 logger.error("\n".join([
1060 "arv-put: %s" % str(error)]))
1062 except ArvPutUploadIsPending:
1063 # Dry run check successful, return proper exit code.
1065 except ArvPutUploadNotPending:
1066 # No files pending for upload
1068 except PathDoesNotExistError as error:
1069 logger.error("\n".join([
1070 "arv-put: %s" % str(error)]))
1073 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1075 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1076 for sigcode in CAUGHT_SIGNALS}
1078 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1079 logger.warning("\n".join([
1080 "arv-put: Resuming previous upload from last checkpoint.",
1081 " Use the --no-resume option to start over."]))
1083 if not args.dry_run:
1084 writer.report_progress()
1087 writer.start(save_collection=not(args.stream or args.raw))
1088 except arvados.errors.ApiError as error:
1089 logger.error("\n".join([
1090 "arv-put: %s" % str(error)]))
1093 if args.progress: # Print newline to split stderr from stdout for humans.
1098 output = writer.manifest_text(normalize=True)
1100 output = writer.manifest_text()
1102 output = ','.join(writer.data_locators())
1105 if args.update_collection:
1106 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1108 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1109 if args.portable_data_hash:
1110 output = writer.portable_data_hash()
1112 output = writer.manifest_locator()
1113 except apiclient_errors.Error as error:
1115 "arv-put: Error creating Collection on project: {}.".format(
1119 # Print the locator (uuid) of the new collection.
1121 status = status or 1
1123 stdout.write(output)
1124 if not output.endswith('\n'):
1127 for sigcode, orig_handler in listitems(orig_signal_handlers):
1128 signal.signal(sigcode, orig_handler)
1137 if __name__ == '__main__':