1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 run_opts = argparse.ArgumentParser(add_help=False)
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165 action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
184 _group.add_argument('--no-progress', action='store_true',
186 Do not display human-readable progress on stderr, even if stderr is a
190 _group.add_argument('--batch-progress', action='store_true',
192 Display machine-readable progress on stderr (bytes and, if known,
196 _group.add_argument('--silent', action='store_true',
198 Do not print any debug messages to console. (Any error messages will still be displayed.)
201 _group = run_opts.add_mutually_exclusive_group()
202 _group.add_argument('--resume', action='store_true', default=True,
204 Continue interrupted uploads from cached state (default).
206 _group.add_argument('--no-resume', action='store_false', dest='resume',
208 Do not continue interrupted uploads from cached state.
211 _group = run_opts.add_mutually_exclusive_group()
212 _group.add_argument('--follow-links', action='store_true', default=True,
213 dest='follow_links', help="""
214 Follow file and directory symlinks (default).
216 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
218 Do not follow file and directory symlinks.
221 _group = run_opts.add_mutually_exclusive_group()
222 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
224 Save upload state in a cache file for resuming (default).
226 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
228 Do not save upload state in a cache file for resuming.
231 arg_parser = argparse.ArgumentParser(
232 description='Copy data from the local filesystem to Keep.',
233 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
235 def parse_arguments(arguments):
236 args = arg_parser.parse_args(arguments)
238 if len(args.paths) == 0:
241 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
243 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
246 --filename argument cannot be used when storing a directory or
250 # Turn on --progress by default if stderr is a tty.
251 if (not (args.batch_progress or args.no_progress or args.silent)
252 and os.isatty(sys.stderr.fileno())):
255 # Turn off --resume (default) if --no-cache is used.
256 if not args.use_cache:
259 if args.paths == ['-']:
260 if args.update_collection:
262 --update-collection cannot be used when reading from stdin.
265 args.use_cache = False
266 if not args.filename:
267 args.filename = 'stdin'
269 # Remove possible duplicated patterns
270 if len(args.exclude) > 0:
271 args.exclude = list(set(args.exclude))
276 class PathDoesNotExistError(Exception):
280 class CollectionUpdateError(Exception):
284 class ResumeCacheConflict(Exception):
288 class ArvPutArgumentConflict(Exception):
292 class ArvPutUploadIsPending(Exception):
296 class ArvPutUploadNotPending(Exception):
300 class FileUploadList(list):
301 def __init__(self, dry_run=False):
303 self.dry_run = dry_run
305 def append(self, other):
307 raise ArvPutUploadIsPending()
308 super(FileUploadList, self).append(other)
311 class ResumeCache(object):
312 CACHE_DIR = '.cache/arvados/arv-put'
314 def __init__(self, file_spec):
315 self.cache_file = open(file_spec, 'a+')
316 self._lock_file(self.cache_file)
317 self.filename = self.cache_file.name
320 def make_path(cls, args):
322 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
323 realpaths = sorted(os.path.realpath(path) for path in args.paths)
324 md5.update(b'\0'.join([p.encode() for p in realpaths]))
325 if any(os.path.isdir(path) for path in realpaths):
328 md5.update(args.filename.encode())
330 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
333 def _lock_file(self, fileobj):
335 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
337 raise ResumeCacheConflict("{} locked".format(fileobj.name))
340 self.cache_file.seek(0)
341 return json.load(self.cache_file)
343 def check_cache(self, api_client=None, num_retries=0):
348 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
349 locator = state["_finished_streams"][0][1][0]
350 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
351 locator = state["_current_stream_locators"][0]
352 if locator is not None:
353 kc = arvados.keep.KeepClient(api_client=api_client)
354 kc.head(locator, num_retries=num_retries)
355 except Exception as e:
360 def save(self, data):
362 new_cache_fd, new_cache_name = tempfile.mkstemp(
363 dir=os.path.dirname(self.filename))
364 self._lock_file(new_cache_fd)
365 new_cache = os.fdopen(new_cache_fd, 'r+')
366 json.dump(data, new_cache)
367 os.rename(new_cache_name, self.filename)
368 except (IOError, OSError, ResumeCacheConflict) as error:
370 os.unlink(new_cache_name)
371 except NameError: # mkstemp failed.
374 self.cache_file.close()
375 self.cache_file = new_cache
378 self.cache_file.close()
382 os.unlink(self.filename)
383 except OSError as error:
384 if error.errno != errno.ENOENT: # That's what we wanted anyway.
390 self.__init__(self.filename)
393 class ArvPutUploadJob(object):
394 CACHE_DIR = '.cache/arvados/arv-put'
396 'manifest' : None, # Last saved manifest checkpoint
397 'files' : {} # Previous run file list: {path : {size, mtime}}
400 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
401 name=None, owner_uuid=None,
402 ensure_unique_name=False, num_retries=None,
403 put_threads=None, replication_desired=None,
404 filename=None, update_time=60.0, update_collection=None,
405 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
406 follow_links=True, exclude_paths=[], exclude_names=None):
409 self.use_cache = use_cache
411 self.reporter = reporter
412 # This will set to 0 before start counting, if no special files are going
414 self.bytes_expected = None
415 self.bytes_written = 0
416 self.bytes_skipped = 0
418 self.owner_uuid = owner_uuid
419 self.ensure_unique_name = ensure_unique_name
420 self.num_retries = num_retries
421 self.replication_desired = replication_desired
422 self.put_threads = put_threads
423 self.filename = filename
424 self._state_lock = threading.Lock()
425 self._state = None # Previous run state (file list & manifest)
426 self._current_files = [] # Current run file list
427 self._cache_file = None
428 self._collection_lock = threading.Lock()
429 self._remote_collection = None # Collection being updated (if asked)
430 self._local_collection = None # Collection from previous run manifest
431 self._file_paths = set() # Files to be updated in remote collection
432 self._stop_checkpointer = threading.Event()
433 self._checkpointer = threading.Thread(target=self._update_task)
434 self._checkpointer.daemon = True
435 self._update_task_time = update_time # How many seconds wait between update runs
436 self._files_to_upload = FileUploadList(dry_run=dry_run)
437 self._upload_started = False
439 self.dry_run = dry_run
440 self._checkpoint_before_quit = True
441 self.follow_links = follow_links
442 self.exclude_paths = exclude_paths
443 self.exclude_names = exclude_names
445 if not self.use_cache and self.resume:
446 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
448 # Check for obvious dry-run responses
449 if self.dry_run and (not self.use_cache or not self.resume):
450 raise ArvPutUploadIsPending()
452 # Load cached data if any and if needed
453 self._setup_state(update_collection)
455 # Build the upload file list, excluding requested files and counting the
456 # bytes expected to be uploaded.
457 self._build_upload_list()
459 def _build_upload_list(self):
461 Scan the requested paths to count file sizes, excluding files & dirs if requested
462 and building the upload file list.
464 # If there aren't special files to be read, reset total bytes count to zero
466 if not any([p for p in self.paths
467 if not (os.path.isfile(p) or os.path.isdir(p))]):
468 self.bytes_expected = 0
470 for path in self.paths:
471 # Test for stdin first, in case some file named '-' exist
474 raise ArvPutUploadIsPending()
475 self._write_stdin(self.filename or 'stdin')
476 elif not os.path.exists(path):
477 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
478 elif os.path.isdir(path):
479 # Use absolute paths on cache index so CWD doesn't interfere
480 # with the caching logic.
482 path = os.path.abspath(path)
483 if orig_path[-1:] == os.sep:
484 # When passing a directory reference with a trailing slash,
485 # its contents should be uploaded directly to the
489 # When passing a directory reference with no trailing slash,
490 # upload the directory to the collection's root.
491 prefixdir = os.path.dirname(path)
493 for root, dirs, files in os.walk(path,
494 followlinks=self.follow_links):
495 root_relpath = os.path.relpath(root, path)
496 if root_relpath == '.':
498 # Exclude files/dirs by full path matching pattern
499 if self.exclude_paths:
500 dirs[:] = [d for d in dirs
501 if not any(pathname_match(
502 os.path.join(root_relpath, d), pat)
503 for pat in self.exclude_paths)]
504 files = [f for f in files
505 if not any(pathname_match(
506 os.path.join(root_relpath, f), pat)
507 for pat in self.exclude_paths)]
508 # Exclude files/dirs by name matching pattern
509 if self.exclude_names is not None:
510 dirs[:] = [d for d in dirs
511 if not self.exclude_names.match(d)]
512 files = [f for f in files
513 if not self.exclude_names.match(f)]
514 # Make os.walk()'s dir traversing order deterministic
518 filepath = os.path.join(root, f)
519 # Add its size to the total bytes count (if applicable)
520 if self.follow_links or (not os.path.islink(filepath)):
521 if self.bytes_expected is not None:
522 self.bytes_expected += os.path.getsize(filepath)
523 self._check_file(filepath,
524 os.path.join(root[len(prefixdir):], f))
526 filepath = os.path.abspath(path)
527 # Add its size to the total bytes count (if applicable)
528 if self.follow_links or (not os.path.islink(filepath)):
529 if self.bytes_expected is not None:
530 self.bytes_expected += os.path.getsize(filepath)
531 self._check_file(filepath,
532 self.filename or os.path.basename(path))
533 # If dry-mode is on, and got up to this point, then we should notify that
534 # there aren't any file to upload.
536 raise ArvPutUploadNotPending()
537 # Remove local_collection's files that don't exist locally anymore, so the
538 # bytes_written count is correct.
539 for f in self.collection_file_paths(self._local_collection,
541 if f != 'stdin' and f != self.filename and not f in self._file_paths:
542 self._local_collection.remove(f)
544 def start(self, save_collection):
546 Start supporting thread & file uploading
548 self._checkpointer.start()
550 # Update bytes_written from current local collection and
551 # report initial progress.
554 self._upload_started = True # Used by the update thread to start checkpointing
556 except (SystemExit, Exception) as e:
557 self._checkpoint_before_quit = False
558 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
559 # Note: We're expecting SystemExit instead of
560 # KeyboardInterrupt because we have a custom signal
561 # handler in place that raises SystemExit with the catched
563 if isinstance(e, PathDoesNotExistError):
564 # We aren't interested in the traceback for this case
566 elif not isinstance(e, SystemExit) or e.code != -2:
567 self.logger.warning("Abnormal termination:\n{}".format(
568 traceback.format_exc()))
572 # Stop the thread before doing anything else
573 self._stop_checkpointer.set()
574 self._checkpointer.join()
575 if self._checkpoint_before_quit:
576 # Commit all pending blocks & one last _update()
577 self._local_collection.manifest_text()
578 self._update(final=True)
580 self.save_collection()
582 self._cache_file.close()
584 def save_collection(self):
586 # Check if files should be updated on the remote collection.
587 for fp in self._file_paths:
588 remote_file = self._remote_collection.find(fp)
590 # File don't exist on remote collection, copy it.
591 self._remote_collection.copy(fp, fp, self._local_collection)
592 elif remote_file != self._local_collection.find(fp):
593 # A different file exist on remote collection, overwrite it.
594 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
596 # The file already exist on remote collection, skip it.
598 self._remote_collection.save(num_retries=self.num_retries)
600 self._local_collection.save_new(
601 name=self.name, owner_uuid=self.owner_uuid,
602 ensure_unique_name=self.ensure_unique_name,
603 num_retries=self.num_retries)
605 def destroy_cache(self):
608 os.unlink(self._cache_filename)
609 except OSError as error:
610 # That's what we wanted anyway.
611 if error.errno != errno.ENOENT:
613 self._cache_file.close()
615 def _collection_size(self, collection):
617 Recursively get the total size of the collection
620 for item in listvalues(collection):
621 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
622 size += self._collection_size(item)
627 def _update_task(self):
629 Periodically called support task. File uploading is
630 asynchronous so we poll status from the collection.
632 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
635 def _update(self, final=False):
637 Update cached manifest text and report progress.
639 if self._upload_started:
640 with self._collection_lock:
641 self.bytes_written = self._collection_size(self._local_collection)
644 manifest = self._local_collection.manifest_text()
646 # Get the manifest text without comitting pending blocks
647 manifest = self._local_collection.manifest_text(strip=False,
651 with self._state_lock:
652 self._state['manifest'] = manifest
656 except Exception as e:
657 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
659 self.bytes_written = self.bytes_skipped
660 # Call the reporter, if any
661 self.report_progress()
663 def report_progress(self):
664 if self.reporter is not None:
665 self.reporter(self.bytes_written, self.bytes_expected)
667 def _write_stdin(self, filename):
668 output = self._local_collection.open(filename, 'wb')
669 self._write(sys.stdin, output)
672 def _check_file(self, source, filename):
674 Check if this file needs to be uploaded
676 # Ignore symlinks when requested
677 if (not self.follow_links) and os.path.islink(source):
680 should_upload = False
681 new_file_in_cache = False
682 # Record file path for updating the remote collection before exiting
683 self._file_paths.add(filename)
685 with self._state_lock:
686 # If no previous cached data on this file, store it for an eventual
688 if source not in self._state['files']:
689 self._state['files'][source] = {
690 'mtime': os.path.getmtime(source),
691 'size' : os.path.getsize(source)
693 new_file_in_cache = True
694 cached_file_data = self._state['files'][source]
696 # Check if file was already uploaded (at least partially)
697 file_in_local_collection = self._local_collection.find(filename)
699 # If not resuming, upload the full file.
702 # New file detected from last run, upload it.
703 elif new_file_in_cache:
705 # Local file didn't change from last run.
706 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
707 if not file_in_local_collection:
708 # File not uploaded yet, upload it completely
710 elif file_in_local_collection.permission_expired():
711 # Permission token expired, re-upload file. This will change whenever
712 # we have a API for refreshing tokens.
713 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
715 self._local_collection.remove(filename)
716 elif cached_file_data['size'] == file_in_local_collection.size():
717 # File already there, skip it.
718 self.bytes_skipped += cached_file_data['size']
719 elif cached_file_data['size'] > file_in_local_collection.size():
720 # File partially uploaded, resume!
721 resume_offset = file_in_local_collection.size()
722 self.bytes_skipped += resume_offset
725 # Inconsistent cache, re-upload the file
727 self._local_collection.remove(filename)
728 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
729 # Local file differs from cached data, re-upload it.
731 if file_in_local_collection:
732 self._local_collection.remove(filename)
737 self._files_to_upload.append((source, resume_offset, filename))
738 except ArvPutUploadIsPending:
739 # This could happen when running on dry-mode, close cache file to
740 # avoid locking issues.
741 self._cache_file.close()
744 def _upload_files(self):
745 for source, resume_offset, filename in self._files_to_upload:
746 with open(source, 'rb') as source_fd:
747 with self._state_lock:
748 self._state['files'][source]['mtime'] = os.path.getmtime(source)
749 self._state['files'][source]['size'] = os.path.getsize(source)
750 if resume_offset > 0:
751 # Start upload where we left off
752 output = self._local_collection.open(filename, 'ab')
753 source_fd.seek(resume_offset)
756 output = self._local_collection.open(filename, 'wb')
757 self._write(source_fd, output)
758 output.close(flush=False)
760 def _write(self, source_fd, output):
762 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
767 def _my_collection(self):
768 return self._remote_collection if self.update else self._local_collection
770 def _setup_state(self, update_collection):
772 Create a new cache file or load a previously existing one.
774 # Load an already existing collection for update
775 if update_collection and re.match(arvados.util.collection_uuid_pattern,
778 self._remote_collection = arvados.collection.Collection(update_collection)
779 except arvados.errors.ApiError as error:
780 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
783 elif update_collection:
784 # Collection locator provided, but unknown format
785 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
788 # Set up cache file name from input paths.
790 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
791 realpaths = sorted(os.path.realpath(path) for path in self.paths)
792 md5.update(b'\0'.join([p.encode() for p in realpaths]))
794 md5.update(self.filename.encode())
795 cache_filename = md5.hexdigest()
796 cache_filepath = os.path.join(
797 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
799 if self.resume and os.path.exists(cache_filepath):
800 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
801 self._cache_file = open(cache_filepath, 'a+')
803 # --no-resume means start with a empty cache file.
804 self.logger.info("Creating new cache file at {}".format(cache_filepath))
805 self._cache_file = open(cache_filepath, 'w+')
806 self._cache_filename = self._cache_file.name
807 self._lock_file(self._cache_file)
808 self._cache_file.seek(0)
810 with self._state_lock:
813 self._state = json.load(self._cache_file)
814 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
815 # Cache at least partially incomplete, set up new cache
816 self._state = copy.deepcopy(self.EMPTY_STATE)
818 # Cache file empty, set up new cache
819 self._state = copy.deepcopy(self.EMPTY_STATE)
821 self.logger.info("No cache usage requested for this run.")
822 # No cache file, set empty state
823 self._state = copy.deepcopy(self.EMPTY_STATE)
824 # Load the previous manifest so we can check if files were modified remotely.
825 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
827 def collection_file_paths(self, col, path_prefix='.'):
828 """Return a list of file paths by recursively go through the entire collection `col`"""
830 for name, item in listitems(col):
831 if isinstance(item, arvados.arvfile.ArvadosFile):
832 file_paths.append(os.path.join(path_prefix, name))
833 elif isinstance(item, arvados.collection.Subcollection):
834 new_prefix = os.path.join(path_prefix, name)
835 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
838 def _lock_file(self, fileobj):
840 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
842 raise ResumeCacheConflict("{} locked".format(fileobj.name))
844 def _save_state(self):
846 Atomically save current state into cache.
848 with self._state_lock:
849 # We're not using copy.deepcopy() here because it's a lot slower
850 # than json.dumps(), and we're already needing JSON format to be
852 state = json.dumps(self._state)
854 new_cache = tempfile.NamedTemporaryFile(
856 dir=os.path.dirname(self._cache_filename), delete=False)
857 self._lock_file(new_cache)
858 new_cache.write(state)
861 os.rename(new_cache.name, self._cache_filename)
862 except (IOError, OSError, ResumeCacheConflict) as error:
863 self.logger.error("There was a problem while saving the cache file: {}".format(error))
865 os.unlink(new_cache_name)
866 except NameError: # mkstemp failed.
869 self._cache_file.close()
870 self._cache_file = new_cache
872 def collection_name(self):
873 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
875 def manifest_locator(self):
876 return self._my_collection().manifest_locator()
878 def portable_data_hash(self):
879 pdh = self._my_collection().portable_data_hash()
880 m = self._my_collection().stripped_manifest().encode()
881 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
883 logger.warning("\n".join([
884 "arv-put: API server provided PDH differs from local manifest.",
885 " This should not happen; showing API server version."]))
888 def manifest_text(self, stream_name=".", strip=False, normalize=False):
889 return self._my_collection().manifest_text(stream_name, strip, normalize)
891 def _datablocks_on_item(self, item):
893 Return a list of datablock locators, recursively navigating
894 through subcollections
896 if isinstance(item, arvados.arvfile.ArvadosFile):
899 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
902 for segment in item.segments():
903 loc = segment.locator
906 elif isinstance(item, arvados.collection.Collection):
907 l = [self._datablocks_on_item(x) for x in listvalues(item)]
908 # Fast list flattener method taken from:
909 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
910 return [loc for sublist in l for loc in sublist]
914 def data_locators(self):
915 with self._collection_lock:
916 # Make sure all datablocks are flushed before getting the locators
917 self._my_collection().manifest_text()
918 datablocks = self._datablocks_on_item(self._my_collection())
921 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
924 # Simulate glob.glob() matching behavior without the need to scan the filesystem
925 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
926 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
927 # so instead we're using it on every path component.
928 def pathname_match(pathname, pattern):
929 name = pathname.split(os.sep)
930 # Fix patterns like 'some/subdir/' or 'some//subdir'
931 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
932 if len(name) != len(pat):
934 for i in range(len(name)):
935 if not fnmatch.fnmatch(name[i], pat[i]):
939 def machine_progress(bytes_written, bytes_expected):
940 return _machine_format.format(
941 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
943 def human_progress(bytes_written, bytes_expected):
945 return "\r{}M / {}M {:.1%} ".format(
946 bytes_written >> 20, bytes_expected >> 20,
947 float(bytes_written) / bytes_expected)
949 return "\r{} ".format(bytes_written)
951 def progress_writer(progress_func, outfile=sys.stderr):
952 def write_progress(bytes_written, bytes_expected):
953 outfile.write(progress_func(bytes_written, bytes_expected))
954 return write_progress
956 def exit_signal_handler(sigcode, frame):
959 def desired_project_uuid(api_client, project_uuid, num_retries):
961 query = api_client.users().current()
962 elif arvados.util.user_uuid_pattern.match(project_uuid):
963 query = api_client.users().get(uuid=project_uuid)
964 elif arvados.util.group_uuid_pattern.match(project_uuid):
965 query = api_client.groups().get(uuid=project_uuid)
967 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
968 return query.execute(num_retries=num_retries)['uuid']
970 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
973 args = parse_arguments(arguments)
974 logger = logging.getLogger('arvados.arv_put')
976 logger.setLevel(logging.WARNING)
978 logger.setLevel(logging.INFO)
980 if api_client is None:
981 api_client = arvados.api('v1')
983 # Determine the name to use
985 if args.stream or args.raw:
986 logger.error("Cannot use --name with --stream or --raw")
988 elif args.update_collection:
989 logger.error("Cannot use --name with --update-collection")
991 collection_name = args.name
993 collection_name = "Saved at {} by {}@{}".format(
994 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
995 pwd.getpwuid(os.getuid()).pw_name,
996 socket.gethostname())
998 if args.project_uuid and (args.stream or args.raw):
999 logger.error("Cannot use --project-uuid with --stream or --raw")
1002 # Determine the parent project
1004 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1006 except (apiclient_errors.Error, ValueError) as error:
1011 reporter = progress_writer(human_progress)
1012 elif args.batch_progress:
1013 reporter = progress_writer(machine_progress)
1017 # Setup exclude regex from all the --exclude arguments provided
1020 exclude_names = None
1021 if len(args.exclude) > 0:
1022 # We're supporting 2 kinds of exclusion patterns:
1023 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1024 # the name, wherever the file is on the tree)
1025 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1026 # entire path, and should be relative to
1027 # any input dir argument)
1028 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1029 # placed directly underneath the input dir)
1030 for p in args.exclude:
1031 # Only relative paths patterns allowed
1032 if p.startswith(os.sep):
1033 logger.error("Cannot use absolute paths with --exclude")
1035 if os.path.dirname(p):
1036 # We don't support of path patterns with '..'
1037 p_parts = p.split(os.sep)
1040 "Cannot use path patterns that include or '..'")
1042 # Path search pattern
1043 exclude_paths.append(p)
1045 # Name-only search pattern
1046 name_patterns.append(p)
1047 # For name only matching, we can combine all patterns into a single
1048 # regexp, for better performance.
1049 exclude_names = re.compile('|'.join(
1050 [fnmatch.translate(p) for p in name_patterns]
1051 )) if len(name_patterns) > 0 else None
1052 # Show the user the patterns to be used, just in case they weren't
1053 # specified inside quotes and got changed by the shell expansion.
1054 logger.info("Exclude patterns: {}".format(args.exclude))
1056 # If this is used by a human, and there's at least one directory to be
1057 # uploaded, the expected bytes calculation can take a moment.
1058 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1059 logger.info("Calculating upload size, this could take some time...")
1061 writer = ArvPutUploadJob(paths = args.paths,
1062 resume = args.resume,
1063 use_cache = args.use_cache,
1064 filename = args.filename,
1065 reporter = reporter,
1066 num_retries = args.retries,
1067 replication_desired = args.replication,
1068 put_threads = args.threads,
1069 name = collection_name,
1070 owner_uuid = project_uuid,
1071 ensure_unique_name = True,
1072 update_collection = args.update_collection,
1074 dry_run=args.dry_run,
1075 follow_links=args.follow_links,
1076 exclude_paths=exclude_paths,
1077 exclude_names=exclude_names)
1078 except ResumeCacheConflict:
1079 logger.error("\n".join([
1080 "arv-put: Another process is already uploading this data.",
1081 " Use --no-cache if this is really what you want."]))
1083 except CollectionUpdateError as error:
1084 logger.error("\n".join([
1085 "arv-put: %s" % str(error)]))
1087 except ArvPutUploadIsPending:
1088 # Dry run check successful, return proper exit code.
1090 except ArvPutUploadNotPending:
1091 # No files pending for upload
1093 except PathDoesNotExistError as error:
1094 logger.error("\n".join([
1095 "arv-put: %s" % str(error)]))
1098 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1100 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1101 for sigcode in CAUGHT_SIGNALS}
1103 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1104 logger.warning("\n".join([
1105 "arv-put: Resuming previous upload from last checkpoint.",
1106 " Use the --no-resume option to start over."]))
1108 if not args.dry_run:
1109 writer.report_progress()
1112 writer.start(save_collection=not(args.stream or args.raw))
1113 except arvados.errors.ApiError as error:
1114 logger.error("\n".join([
1115 "arv-put: %s" % str(error)]))
1118 if args.progress: # Print newline to split stderr from stdout for humans.
1123 output = writer.manifest_text(normalize=True)
1125 output = writer.manifest_text()
1127 output = ','.join(writer.data_locators())
1130 if args.update_collection:
1131 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1133 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1134 if args.portable_data_hash:
1135 output = writer.portable_data_hash()
1137 output = writer.manifest_locator()
1138 except apiclient_errors.Error as error:
1140 "arv-put: Error creating Collection on project: {}.".format(
1144 # Print the locator (uuid) of the new collection.
1146 status = status or 1
1147 elif not args.silent:
1148 stdout.write(output)
1149 if not output.endswith('\n'):
1152 for sigcode, orig_handler in listitems(orig_signal_handlers):
1153 signal.signal(sigcode, orig_handler)
1162 if __name__ == '__main__':