1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 run_opts = argparse.ArgumentParser(add_help=False)
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165 action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
184 _group.add_argument('--no-progress', action='store_true',
186 Do not display human-readable progress on stderr, even if stderr is a
190 _group.add_argument('--batch-progress', action='store_true',
192 Display machine-readable progress on stderr (bytes and, if known,
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--resume', action='store_true', default=True,
199 Continue interrupted uploads from cached state (default).
201 _group.add_argument('--no-resume', action='store_false', dest='resume',
203 Do not continue interrupted uploads from cached state.
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--follow-links', action='store_true', default=True,
208 dest='follow_links', help="""
209 Follow file and directory symlinks (default).
211 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
213 Do not follow file and directory symlinks.
216 _group = run_opts.add_mutually_exclusive_group()
217 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
219 Save upload state in a cache file for resuming (default).
221 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
223 Do not save upload state in a cache file for resuming.
226 arg_parser = argparse.ArgumentParser(
227 description='Copy data from the local filesystem to Keep.',
228 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
230 def parse_arguments(arguments):
231 args = arg_parser.parse_args(arguments)
233 if len(args.paths) == 0:
236 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
238 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
241 --filename argument cannot be used when storing a directory or
245 # Turn on --progress by default if stderr is a tty.
246 if (not (args.batch_progress or args.no_progress)
247 and os.isatty(sys.stderr.fileno())):
250 # Turn off --resume (default) if --no-cache is used.
251 if not args.use_cache:
254 if args.paths == ['-']:
255 if args.update_collection:
257 --update-collection cannot be used when reading from stdin.
260 args.use_cache = False
261 if not args.filename:
262 args.filename = 'stdin'
264 # Remove possible duplicated patterns
265 if len(args.exclude) > 0:
266 args.exclude = list(set(args.exclude))
271 class PathDoesNotExistError(Exception):
275 class CollectionUpdateError(Exception):
279 class ResumeCacheConflict(Exception):
283 class ArvPutArgumentConflict(Exception):
287 class ArvPutUploadIsPending(Exception):
291 class ArvPutUploadNotPending(Exception):
295 class FileUploadList(list):
296 def __init__(self, dry_run=False):
298 self.dry_run = dry_run
300 def append(self, other):
302 raise ArvPutUploadIsPending()
303 super(FileUploadList, self).append(other)
306 class ResumeCache(object):
307 CACHE_DIR = '.cache/arvados/arv-put'
309 def __init__(self, file_spec):
310 self.cache_file = open(file_spec, 'a+')
311 self._lock_file(self.cache_file)
312 self.filename = self.cache_file.name
315 def make_path(cls, args):
317 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
318 realpaths = sorted(os.path.realpath(path) for path in args.paths)
319 md5.update(b'\0'.join([p.encode() for p in realpaths]))
320 if any(os.path.isdir(path) for path in realpaths):
323 md5.update(args.filename.encode())
325 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
328 def _lock_file(self, fileobj):
330 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
332 raise ResumeCacheConflict("{} locked".format(fileobj.name))
335 self.cache_file.seek(0)
336 return json.load(self.cache_file)
338 def check_cache(self, api_client=None, num_retries=0):
343 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
344 locator = state["_finished_streams"][0][1][0]
345 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
346 locator = state["_current_stream_locators"][0]
347 if locator is not None:
348 kc = arvados.keep.KeepClient(api_client=api_client)
349 kc.head(locator, num_retries=num_retries)
350 except Exception as e:
355 def save(self, data):
357 new_cache_fd, new_cache_name = tempfile.mkstemp(
358 dir=os.path.dirname(self.filename))
359 self._lock_file(new_cache_fd)
360 new_cache = os.fdopen(new_cache_fd, 'r+')
361 json.dump(data, new_cache)
362 os.rename(new_cache_name, self.filename)
363 except (IOError, OSError, ResumeCacheConflict) as error:
365 os.unlink(new_cache_name)
366 except NameError: # mkstemp failed.
369 self.cache_file.close()
370 self.cache_file = new_cache
373 self.cache_file.close()
377 os.unlink(self.filename)
378 except OSError as error:
379 if error.errno != errno.ENOENT: # That's what we wanted anyway.
385 self.__init__(self.filename)
388 class ArvPutUploadJob(object):
389 CACHE_DIR = '.cache/arvados/arv-put'
391 'manifest' : None, # Last saved manifest checkpoint
392 'files' : {} # Previous run file list: {path : {size, mtime}}
395 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
396 name=None, owner_uuid=None,
397 ensure_unique_name=False, num_retries=None,
398 put_threads=None, replication_desired=None,
399 filename=None, update_time=60.0, update_collection=None,
400 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
401 follow_links=True, exclude_paths=[], exclude_names=None):
404 self.use_cache = use_cache
406 self.reporter = reporter
407 # This will set to 0 before start counting, if no special files are going
409 self.bytes_expected = None
410 self.bytes_written = 0
411 self.bytes_skipped = 0
413 self.owner_uuid = owner_uuid
414 self.ensure_unique_name = ensure_unique_name
415 self.num_retries = num_retries
416 self.replication_desired = replication_desired
417 self.put_threads = put_threads
418 self.filename = filename
419 self._state_lock = threading.Lock()
420 self._state = None # Previous run state (file list & manifest)
421 self._current_files = [] # Current run file list
422 self._cache_file = None
423 self._collection_lock = threading.Lock()
424 self._remote_collection = None # Collection being updated (if asked)
425 self._local_collection = None # Collection from previous run manifest
426 self._file_paths = set() # Files to be updated in remote collection
427 self._stop_checkpointer = threading.Event()
428 self._checkpointer = threading.Thread(target=self._update_task)
429 self._checkpointer.daemon = True
430 self._update_task_time = update_time # How many seconds wait between update runs
431 self._files_to_upload = FileUploadList(dry_run=dry_run)
432 self._upload_started = False
434 self.dry_run = dry_run
435 self._checkpoint_before_quit = True
436 self.follow_links = follow_links
437 self.exclude_paths = exclude_paths
438 self.exclude_names = exclude_names
440 if not self.use_cache and self.resume:
441 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
443 # Check for obvious dry-run responses
444 if self.dry_run and (not self.use_cache or not self.resume):
445 raise ArvPutUploadIsPending()
447 # Load cached data if any and if needed
448 self._setup_state(update_collection)
450 # Build the upload file list, excluding requested files and counting the
451 # bytes expected to be uploaded.
452 self._build_upload_list()
454 def _build_upload_list(self):
456 Scan the requested paths to count file sizes, excluding files & dirs if requested
457 and building the upload file list.
459 # If there aren't special files to be read, reset total bytes count to zero
461 if not any([p for p in self.paths
462 if not (os.path.isfile(p) or os.path.isdir(p))]):
463 self.bytes_expected = 0
465 for path in self.paths:
466 # Test for stdin first, in case some file named '-' exist
469 raise ArvPutUploadIsPending()
470 self._write_stdin(self.filename or 'stdin')
471 elif not os.path.exists(path):
472 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
473 elif os.path.isdir(path):
474 # Use absolute paths on cache index so CWD doesn't interfere
475 # with the caching logic.
477 path = os.path.abspath(path)
478 if orig_path[-1:] == os.sep:
479 # When passing a directory reference with a trailing slash,
480 # its contents should be uploaded directly to the
484 # When passing a directory reference with no trailing slash,
485 # upload the directory to the collection's root.
486 prefixdir = os.path.dirname(path)
488 for root, dirs, files in os.walk(path,
489 followlinks=self.follow_links):
490 root_relpath = os.path.relpath(root, path)
491 if root_relpath == '.':
493 # Exclude files/dirs by full path matching pattern
494 if self.exclude_paths:
495 dirs[:] = [d for d in dirs
496 if not any(pathname_match(
497 os.path.join(root_relpath, d), pat)
498 for pat in self.exclude_paths)]
499 files = [f for f in files
500 if not any(pathname_match(
501 os.path.join(root_relpath, f), pat)
502 for pat in self.exclude_paths)]
503 # Exclude files/dirs by name matching pattern
504 if self.exclude_names is not None:
505 dirs[:] = [d for d in dirs
506 if not self.exclude_names.match(d)]
507 files = [f for f in files
508 if not self.exclude_names.match(f)]
509 # Make os.walk()'s dir traversing order deterministic
513 filepath = os.path.join(root, f)
514 # Add its size to the total bytes count (if applicable)
515 if self.follow_links or (not os.path.islink(filepath)):
516 if self.bytes_expected is not None:
517 self.bytes_expected += os.path.getsize(filepath)
518 self._check_file(filepath,
519 os.path.join(root[len(prefixdir):], f))
521 filepath = os.path.abspath(path)
522 # Add its size to the total bytes count (if applicable)
523 if self.follow_links or (not os.path.islink(filepath)):
524 if self.bytes_expected is not None:
525 self.bytes_expected += os.path.getsize(filepath)
526 self._check_file(filepath,
527 self.filename or os.path.basename(path))
528 # If dry-mode is on, and got up to this point, then we should notify that
529 # there aren't any file to upload.
531 raise ArvPutUploadNotPending()
532 # Remove local_collection's files that don't exist locally anymore, so the
533 # bytes_written count is correct.
534 for f in self.collection_file_paths(self._local_collection,
536 if f != 'stdin' and f != self.filename and not f in self._file_paths:
537 self._local_collection.remove(f)
539 def start(self, save_collection):
541 Start supporting thread & file uploading
543 self._checkpointer.start()
545 # Update bytes_written from current local collection and
546 # report initial progress.
549 self._upload_started = True # Used by the update thread to start checkpointing
551 except (SystemExit, Exception) as e:
552 self._checkpoint_before_quit = False
553 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
554 # Note: We're expecting SystemExit instead of
555 # KeyboardInterrupt because we have a custom signal
556 # handler in place that raises SystemExit with the catched
558 if isinstance(e, PathDoesNotExistError):
559 # We aren't interested in the traceback for this case
561 elif not isinstance(e, SystemExit) or e.code != -2:
562 self.logger.warning("Abnormal termination:\n{}".format(
563 traceback.format_exc()))
567 # Stop the thread before doing anything else
568 self._stop_checkpointer.set()
569 self._checkpointer.join()
570 if self._checkpoint_before_quit:
571 # Commit all pending blocks & one last _update()
572 self._local_collection.manifest_text()
573 self._update(final=True)
575 self.save_collection()
577 self._cache_file.close()
579 def save_collection(self):
581 # Check if files should be updated on the remote collection.
582 for fp in self._file_paths:
583 remote_file = self._remote_collection.find(fp)
585 # File don't exist on remote collection, copy it.
586 self._remote_collection.copy(fp, fp, self._local_collection)
587 elif remote_file != self._local_collection.find(fp):
588 # A different file exist on remote collection, overwrite it.
589 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
591 # The file already exist on remote collection, skip it.
593 self._remote_collection.save(num_retries=self.num_retries)
595 self._local_collection.save_new(
596 name=self.name, owner_uuid=self.owner_uuid,
597 ensure_unique_name=self.ensure_unique_name,
598 num_retries=self.num_retries)
600 def destroy_cache(self):
603 os.unlink(self._cache_filename)
604 except OSError as error:
605 # That's what we wanted anyway.
606 if error.errno != errno.ENOENT:
608 self._cache_file.close()
610 def _collection_size(self, collection):
612 Recursively get the total size of the collection
615 for item in listvalues(collection):
616 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
617 size += self._collection_size(item)
622 def _update_task(self):
624 Periodically called support task. File uploading is
625 asynchronous so we poll status from the collection.
627 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
630 def _update(self, final=False):
632 Update cached manifest text and report progress.
634 if self._upload_started:
635 with self._collection_lock:
636 self.bytes_written = self._collection_size(self._local_collection)
639 manifest = self._local_collection.manifest_text()
641 # Get the manifest text without comitting pending blocks
642 manifest = self._local_collection.manifest_text(strip=False,
646 with self._state_lock:
647 self._state['manifest'] = manifest
651 except Exception as e:
652 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
654 self.bytes_written = self.bytes_skipped
655 # Call the reporter, if any
656 self.report_progress()
658 def report_progress(self):
659 if self.reporter is not None:
660 self.reporter(self.bytes_written, self.bytes_expected)
662 def _write_stdin(self, filename):
663 output = self._local_collection.open(filename, 'wb')
664 self._write(sys.stdin, output)
667 def _check_file(self, source, filename):
669 Check if this file needs to be uploaded
671 # Ignore symlinks when requested
672 if (not self.follow_links) and os.path.islink(source):
675 should_upload = False
676 new_file_in_cache = False
677 # Record file path for updating the remote collection before exiting
678 self._file_paths.add(filename)
680 with self._state_lock:
681 # If no previous cached data on this file, store it for an eventual
683 if source not in self._state['files']:
684 self._state['files'][source] = {
685 'mtime': os.path.getmtime(source),
686 'size' : os.path.getsize(source)
688 new_file_in_cache = True
689 cached_file_data = self._state['files'][source]
691 # Check if file was already uploaded (at least partially)
692 file_in_local_collection = self._local_collection.find(filename)
694 # If not resuming, upload the full file.
697 # New file detected from last run, upload it.
698 elif new_file_in_cache:
700 # Local file didn't change from last run.
701 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
702 if not file_in_local_collection:
703 # File not uploaded yet, upload it completely
705 elif file_in_local_collection.permission_expired():
706 # Permission token expired, re-upload file. This will change whenever
707 # we have a API for refreshing tokens.
709 self._local_collection.remove(filename)
710 elif cached_file_data['size'] == file_in_local_collection.size():
711 # File already there, skip it.
712 self.bytes_skipped += cached_file_data['size']
713 elif cached_file_data['size'] > file_in_local_collection.size():
714 # File partially uploaded, resume!
715 resume_offset = file_in_local_collection.size()
716 self.bytes_skipped += resume_offset
719 # Inconsistent cache, re-upload the file
721 self._local_collection.remove(filename)
722 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
723 # Local file differs from cached data, re-upload it.
725 if file_in_local_collection:
726 self._local_collection.remove(filename)
731 self._files_to_upload.append((source, resume_offset, filename))
732 except ArvPutUploadIsPending:
733 # This could happen when running on dry-mode, close cache file to
734 # avoid locking issues.
735 self._cache_file.close()
738 def _upload_files(self):
739 for source, resume_offset, filename in self._files_to_upload:
740 with open(source, 'rb') as source_fd:
741 with self._state_lock:
742 self._state['files'][source]['mtime'] = os.path.getmtime(source)
743 self._state['files'][source]['size'] = os.path.getsize(source)
744 if resume_offset > 0:
745 # Start upload where we left off
746 output = self._local_collection.open(filename, 'ab')
747 source_fd.seek(resume_offset)
750 output = self._local_collection.open(filename, 'wb')
751 self._write(source_fd, output)
752 output.close(flush=False)
754 def _write(self, source_fd, output):
756 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
761 def _my_collection(self):
762 return self._remote_collection if self.update else self._local_collection
764 def _setup_state(self, update_collection):
766 Create a new cache file or load a previously existing one.
768 # Load an already existing collection for update
769 if update_collection and re.match(arvados.util.collection_uuid_pattern,
772 self._remote_collection = arvados.collection.Collection(update_collection)
773 except arvados.errors.ApiError as error:
774 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
777 elif update_collection:
778 # Collection locator provided, but unknown format
779 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
782 # Set up cache file name from input paths.
784 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
785 realpaths = sorted(os.path.realpath(path) for path in self.paths)
786 md5.update(b'\0'.join([p.encode() for p in realpaths]))
788 md5.update(self.filename.encode())
789 cache_filename = md5.hexdigest()
790 cache_filepath = os.path.join(
791 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
793 if self.resume and os.path.exists(cache_filepath):
794 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
795 self._cache_file = open(cache_filepath, 'a+')
797 # --no-resume means start with a empty cache file.
798 self.logger.info("Creating new cache file at {}".format(cache_filepath))
799 self._cache_file = open(cache_filepath, 'w+')
800 self._cache_filename = self._cache_file.name
801 self._lock_file(self._cache_file)
802 self._cache_file.seek(0)
804 with self._state_lock:
807 self._state = json.load(self._cache_file)
808 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
809 # Cache at least partially incomplete, set up new cache
810 self._state = copy.deepcopy(self.EMPTY_STATE)
812 # Cache file empty, set up new cache
813 self._state = copy.deepcopy(self.EMPTY_STATE)
815 self.logger.info("No cache usage requested for this run.")
816 # No cache file, set empty state
817 self._state = copy.deepcopy(self.EMPTY_STATE)
818 # Load the previous manifest so we can check if files were modified remotely.
819 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
821 def collection_file_paths(self, col, path_prefix='.'):
822 """Return a list of file paths by recursively go through the entire collection `col`"""
824 for name, item in listitems(col):
825 if isinstance(item, arvados.arvfile.ArvadosFile):
826 file_paths.append(os.path.join(path_prefix, name))
827 elif isinstance(item, arvados.collection.Subcollection):
828 new_prefix = os.path.join(path_prefix, name)
829 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
832 def _lock_file(self, fileobj):
834 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
836 raise ResumeCacheConflict("{} locked".format(fileobj.name))
838 def _save_state(self):
840 Atomically save current state into cache.
842 with self._state_lock:
843 # We're not using copy.deepcopy() here because it's a lot slower
844 # than json.dumps(), and we're already needing JSON format to be
846 state = json.dumps(self._state)
848 new_cache = tempfile.NamedTemporaryFile(
850 dir=os.path.dirname(self._cache_filename), delete=False)
851 self._lock_file(new_cache)
852 new_cache.write(state)
855 os.rename(new_cache.name, self._cache_filename)
856 except (IOError, OSError, ResumeCacheConflict) as error:
857 self.logger.error("There was a problem while saving the cache file: {}".format(error))
859 os.unlink(new_cache_name)
860 except NameError: # mkstemp failed.
863 self._cache_file.close()
864 self._cache_file = new_cache
866 def collection_name(self):
867 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
869 def manifest_locator(self):
870 return self._my_collection().manifest_locator()
872 def portable_data_hash(self):
873 pdh = self._my_collection().portable_data_hash()
874 m = self._my_collection().stripped_manifest().encode()
875 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
877 logger.warning("\n".join([
878 "arv-put: API server provided PDH differs from local manifest.",
879 " This should not happen; showing API server version."]))
882 def manifest_text(self, stream_name=".", strip=False, normalize=False):
883 return self._my_collection().manifest_text(stream_name, strip, normalize)
885 def _datablocks_on_item(self, item):
887 Return a list of datablock locators, recursively navigating
888 through subcollections
890 if isinstance(item, arvados.arvfile.ArvadosFile):
893 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
896 for segment in item.segments():
897 loc = segment.locator
900 elif isinstance(item, arvados.collection.Collection):
901 l = [self._datablocks_on_item(x) for x in listvalues(item)]
902 # Fast list flattener method taken from:
903 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
904 return [loc for sublist in l for loc in sublist]
908 def data_locators(self):
909 with self._collection_lock:
910 # Make sure all datablocks are flushed before getting the locators
911 self._my_collection().manifest_text()
912 datablocks = self._datablocks_on_item(self._my_collection())
915 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
918 # Simulate glob.glob() matching behavior without the need to scan the filesystem
919 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
920 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
921 # so instead we're using it on every path component.
922 def pathname_match(pathname, pattern):
923 name = pathname.split(os.sep)
924 # Fix patterns like 'some/subdir/' or 'some//subdir'
925 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
926 if len(name) != len(pat):
928 for i in range(len(name)):
929 if not fnmatch.fnmatch(name[i], pat[i]):
933 def machine_progress(bytes_written, bytes_expected):
934 return _machine_format.format(
935 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
937 def human_progress(bytes_written, bytes_expected):
939 return "\r{}M / {}M {:.1%} ".format(
940 bytes_written >> 20, bytes_expected >> 20,
941 float(bytes_written) / bytes_expected)
943 return "\r{} ".format(bytes_written)
945 def progress_writer(progress_func, outfile=sys.stderr):
946 def write_progress(bytes_written, bytes_expected):
947 outfile.write(progress_func(bytes_written, bytes_expected))
948 return write_progress
950 def exit_signal_handler(sigcode, frame):
953 def desired_project_uuid(api_client, project_uuid, num_retries):
955 query = api_client.users().current()
956 elif arvados.util.user_uuid_pattern.match(project_uuid):
957 query = api_client.users().get(uuid=project_uuid)
958 elif arvados.util.group_uuid_pattern.match(project_uuid):
959 query = api_client.groups().get(uuid=project_uuid)
961 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
962 return query.execute(num_retries=num_retries)['uuid']
964 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
967 logger = logging.getLogger('arvados.arv_put')
968 logger.setLevel(logging.INFO)
969 args = parse_arguments(arguments)
971 if api_client is None:
972 api_client = arvados.api('v1')
974 # Determine the name to use
976 if args.stream or args.raw:
977 logger.error("Cannot use --name with --stream or --raw")
979 elif args.update_collection:
980 logger.error("Cannot use --name with --update-collection")
982 collection_name = args.name
984 collection_name = "Saved at {} by {}@{}".format(
985 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
986 pwd.getpwuid(os.getuid()).pw_name,
987 socket.gethostname())
989 if args.project_uuid and (args.stream or args.raw):
990 logger.error("Cannot use --project-uuid with --stream or --raw")
993 # Determine the parent project
995 project_uuid = desired_project_uuid(api_client, args.project_uuid,
997 except (apiclient_errors.Error, ValueError) as error:
1002 reporter = progress_writer(human_progress)
1003 elif args.batch_progress:
1004 reporter = progress_writer(machine_progress)
1008 # Setup exclude regex from all the --exclude arguments provided
1011 exclude_names = None
1012 if len(args.exclude) > 0:
1013 # We're supporting 2 kinds of exclusion patterns:
1014 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1015 # the name, wherever the file is on the tree)
1016 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1017 # entire path, and should be relative to
1018 # any input dir argument)
1019 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1020 # placed directly underneath the input dir)
1021 for p in args.exclude:
1022 # Only relative paths patterns allowed
1023 if p.startswith(os.sep):
1024 logger.error("Cannot use absolute paths with --exclude")
1026 if os.path.dirname(p):
1027 # We don't support of path patterns with '..'
1028 p_parts = p.split(os.sep)
1031 "Cannot use path patterns that include or '..'")
1033 # Path search pattern
1034 exclude_paths.append(p)
1036 # Name-only search pattern
1037 name_patterns.append(p)
1038 # For name only matching, we can combine all patterns into a single
1039 # regexp, for better performance.
1040 exclude_names = re.compile('|'.join(
1041 [fnmatch.translate(p) for p in name_patterns]
1042 )) if len(name_patterns) > 0 else None
1043 # Show the user the patterns to be used, just in case they weren't
1044 # specified inside quotes and got changed by the shell expansion.
1045 logger.info("Exclude patterns: {}".format(args.exclude))
1047 # If this is used by a human, and there's at least one directory to be
1048 # uploaded, the expected bytes calculation can take a moment.
1049 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1050 logger.info("Calculating upload size, this could take some time...")
1052 writer = ArvPutUploadJob(paths = args.paths,
1053 resume = args.resume,
1054 use_cache = args.use_cache,
1055 filename = args.filename,
1056 reporter = reporter,
1057 num_retries = args.retries,
1058 replication_desired = args.replication,
1059 put_threads = args.threads,
1060 name = collection_name,
1061 owner_uuid = project_uuid,
1062 ensure_unique_name = True,
1063 update_collection = args.update_collection,
1065 dry_run=args.dry_run,
1066 follow_links=args.follow_links,
1067 exclude_paths=exclude_paths,
1068 exclude_names=exclude_names)
1069 except ResumeCacheConflict:
1070 logger.error("\n".join([
1071 "arv-put: Another process is already uploading this data.",
1072 " Use --no-cache if this is really what you want."]))
1074 except CollectionUpdateError as error:
1075 logger.error("\n".join([
1076 "arv-put: %s" % str(error)]))
1078 except ArvPutUploadIsPending:
1079 # Dry run check successful, return proper exit code.
1081 except ArvPutUploadNotPending:
1082 # No files pending for upload
1084 except PathDoesNotExistError as error:
1085 logger.error("\n".join([
1086 "arv-put: %s" % str(error)]))
1089 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1091 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1092 for sigcode in CAUGHT_SIGNALS}
1094 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1095 logger.warning("\n".join([
1096 "arv-put: Resuming previous upload from last checkpoint.",
1097 " Use the --no-resume option to start over."]))
1099 if not args.dry_run:
1100 writer.report_progress()
1103 writer.start(save_collection=not(args.stream or args.raw))
1104 except arvados.errors.ApiError as error:
1105 logger.error("\n".join([
1106 "arv-put: %s" % str(error)]))
1109 if args.progress: # Print newline to split stderr from stdout for humans.
1114 output = writer.manifest_text(normalize=True)
1116 output = writer.manifest_text()
1118 output = ','.join(writer.data_locators())
1121 if args.update_collection:
1122 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1124 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1125 if args.portable_data_hash:
1126 output = writer.portable_data_hash()
1128 output = writer.manifest_locator()
1129 except apiclient_errors.Error as error:
1131 "arv-put: Error creating Collection on project: {}.".format(
1135 # Print the locator (uuid) of the new collection.
1137 status = status or 1
1139 stdout.write(output)
1140 if not output.endswith('\n'):
1143 for sigcode, orig_handler in listitems(orig_signal_handlers):
1144 signal.signal(sigcode, orig_handler)
1153 if __name__ == '__main__':