1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34 from arvados.util import keep_locator_pattern
36 import arvados.commands._util as arv_cmd
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
153 On high latency installations, using a greater number will improve
157 run_opts = argparse.ArgumentParser(add_help=False)
159 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
160 Store the collection in the specified project, instead of your Home
164 run_opts.add_argument('--name', help="""
165 Save the collection with the specified name.
168 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
169 action='append', help="""
170 Exclude files and directories whose names match the given glob pattern. When
171 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
172 directory, relative to the provided input dirs will be excluded.
173 When using a filename pattern like '*.txt', any text file will be excluded
174 no matter where is placed.
175 For the special case of needing to exclude only files or dirs directly below
176 the given input directory, you can use a pattern like './exclude_this.gif'.
177 You can specify multiple patterns by using this argument more than once.
180 _group = run_opts.add_mutually_exclusive_group()
181 _group.add_argument('--progress', action='store_true',
183 Display human-readable progress on stderr (bytes and, if possible,
184 percentage of total data size). This is the default behavior when
188 _group.add_argument('--no-progress', action='store_true',
190 Do not display human-readable progress on stderr, even if stderr is a
194 _group.add_argument('--batch-progress', action='store_true',
196 Display machine-readable progress on stderr (bytes and, if known,
200 run_opts.add_argument('--silent', action='store_true',
202 Do not print any debug messages to console. (Any error messages will
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--resume', action='store_true', default=True,
209 Continue interrupted uploads from cached state (default).
211 _group.add_argument('--no-resume', action='store_false', dest='resume',
213 Do not continue interrupted uploads from cached state.
216 _group = run_opts.add_mutually_exclusive_group()
217 _group.add_argument('--follow-links', action='store_true', default=True,
218 dest='follow_links', help="""
219 Follow file and directory symlinks (default).
221 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
223 Do not follow file and directory symlinks.
226 _group = run_opts.add_mutually_exclusive_group()
227 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
229 Save upload state in a cache file for resuming (default).
231 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
233 Do not save upload state in a cache file for resuming.
236 arg_parser = argparse.ArgumentParser(
237 description='Copy data from the local filesystem to Keep.',
238 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
240 def parse_arguments(arguments):
241 args = arg_parser.parse_args(arguments)
243 if len(args.paths) == 0:
246 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
248 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
251 --filename argument cannot be used when storing a directory or
255 # Turn on --progress by default if stderr is a tty.
256 if (not (args.batch_progress or args.no_progress or args.silent)
257 and os.isatty(sys.stderr.fileno())):
260 # Turn off --resume (default) if --no-cache is used.
261 if not args.use_cache:
264 if args.paths == ['-']:
265 if args.update_collection:
267 --update-collection cannot be used when reading from stdin.
270 args.use_cache = False
271 if not args.filename:
272 args.filename = 'stdin'
274 # Remove possible duplicated patterns
275 if len(args.exclude) > 0:
276 args.exclude = list(set(args.exclude))
281 class PathDoesNotExistError(Exception):
285 class CollectionUpdateError(Exception):
289 class ResumeCacheConflict(Exception):
293 class ResumeCacheInvalidError(Exception):
296 class ArvPutArgumentConflict(Exception):
300 class ArvPutUploadIsPending(Exception):
304 class ArvPutUploadNotPending(Exception):
308 class FileUploadList(list):
309 def __init__(self, dry_run=False):
311 self.dry_run = dry_run
313 def append(self, other):
315 raise ArvPutUploadIsPending()
316 super(FileUploadList, self).append(other)
319 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
320 class ArvPutLogFormatter(logging.Formatter):
321 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
323 request_id_informed = False
325 def __init__(self, request_id):
326 self.err_fmtr = logging.Formatter(
327 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
328 arvados.log_date_format)
330 def format(self, record):
331 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
332 self.request_id_informed = True
333 return self.err_fmtr.format(record)
334 return self.std_fmtr.format(record)
337 class ResumeCache(object):
338 CACHE_DIR = '.cache/arvados/arv-put'
340 def __init__(self, file_spec):
341 self.cache_file = open(file_spec, 'a+')
342 self._lock_file(self.cache_file)
343 self.filename = self.cache_file.name
346 def make_path(cls, args):
348 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
349 realpaths = sorted(os.path.realpath(path) for path in args.paths)
350 md5.update(b'\0'.join([p.encode() for p in realpaths]))
351 if any(os.path.isdir(path) for path in realpaths):
354 md5.update(args.filename.encode())
356 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
359 def _lock_file(self, fileobj):
361 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
363 raise ResumeCacheConflict("{} locked".format(fileobj.name))
366 self.cache_file.seek(0)
367 return json.load(self.cache_file)
369 def check_cache(self, api_client=None, num_retries=0):
374 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
375 locator = state["_finished_streams"][0][1][0]
376 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
377 locator = state["_current_stream_locators"][0]
378 if locator is not None:
379 kc = arvados.keep.KeepClient(api_client=api_client)
380 kc.head(locator, num_retries=num_retries)
381 except Exception as e:
386 def save(self, data):
388 new_cache_fd, new_cache_name = tempfile.mkstemp(
389 dir=os.path.dirname(self.filename))
390 self._lock_file(new_cache_fd)
391 new_cache = os.fdopen(new_cache_fd, 'r+')
392 json.dump(data, new_cache)
393 os.rename(new_cache_name, self.filename)
394 except (IOError, OSError, ResumeCacheConflict):
396 os.unlink(new_cache_name)
397 except NameError: # mkstemp failed.
400 self.cache_file.close()
401 self.cache_file = new_cache
404 self.cache_file.close()
408 os.unlink(self.filename)
409 except OSError as error:
410 if error.errno != errno.ENOENT: # That's what we wanted anyway.
416 self.__init__(self.filename)
419 class ArvPutUploadJob(object):
420 CACHE_DIR = '.cache/arvados/arv-put'
422 'manifest' : None, # Last saved manifest checkpoint
423 'files' : {} # Previous run file list: {path : {size, mtime}}
426 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
427 name=None, owner_uuid=None, api_client=None,
428 ensure_unique_name=False, num_retries=None,
429 put_threads=None, replication_desired=None, filename=None,
430 update_time=60.0, update_collection=None, storage_classes=None,
431 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
432 follow_links=True, exclude_paths=[], exclude_names=None):
435 self.use_cache = use_cache
437 self.reporter = reporter
438 # This will set to 0 before start counting, if no special files are going
440 self.bytes_expected = None
441 self.bytes_written = 0
442 self.bytes_skipped = 0
444 self.owner_uuid = owner_uuid
445 self.ensure_unique_name = ensure_unique_name
446 self.num_retries = num_retries
447 self.replication_desired = replication_desired
448 self.put_threads = put_threads
449 self.filename = filename
450 self.storage_classes = storage_classes
451 self._api_client = api_client
452 self._state_lock = threading.Lock()
453 self._state = None # Previous run state (file list & manifest)
454 self._current_files = [] # Current run file list
455 self._cache_file = None
456 self._collection_lock = threading.Lock()
457 self._remote_collection = None # Collection being updated (if asked)
458 self._local_collection = None # Collection from previous run manifest
459 self._file_paths = set() # Files to be updated in remote collection
460 self._stop_checkpointer = threading.Event()
461 self._checkpointer = threading.Thread(target=self._update_task)
462 self._checkpointer.daemon = True
463 self._update_task_time = update_time # How many seconds wait between update runs
464 self._files_to_upload = FileUploadList(dry_run=dry_run)
465 self._upload_started = False
467 self.dry_run = dry_run
468 self._checkpoint_before_quit = True
469 self.follow_links = follow_links
470 self.exclude_paths = exclude_paths
471 self.exclude_names = exclude_names
473 if not self.use_cache and self.resume:
474 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
476 # Check for obvious dry-run responses
477 if self.dry_run and (not self.use_cache or not self.resume):
478 raise ArvPutUploadIsPending()
480 # Load cached data if any and if needed
481 self._setup_state(update_collection)
483 # Build the upload file list, excluding requested files and counting the
484 # bytes expected to be uploaded.
485 self._build_upload_list()
487 def _build_upload_list(self):
489 Scan the requested paths to count file sizes, excluding requested files
490 and dirs and building the upload file list.
492 # If there aren't special files to be read, reset total bytes count to zero
494 if not any([p for p in self.paths
495 if not (os.path.isfile(p) or os.path.isdir(p))]):
496 self.bytes_expected = 0
498 for path in self.paths:
499 # Test for stdin first, in case some file named '-' exist
502 raise ArvPutUploadIsPending()
503 self._write_stdin(self.filename or 'stdin')
504 elif not os.path.exists(path):
505 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
506 elif os.path.isdir(path):
507 # Use absolute paths on cache index so CWD doesn't interfere
508 # with the caching logic.
510 path = os.path.abspath(path)
511 if orig_path[-1:] == os.sep:
512 # When passing a directory reference with a trailing slash,
513 # its contents should be uploaded directly to the
517 # When passing a directory reference with no trailing slash,
518 # upload the directory to the collection's root.
519 prefixdir = os.path.dirname(path)
521 for root, dirs, files in os.walk(path,
522 followlinks=self.follow_links):
523 root_relpath = os.path.relpath(root, path)
524 if root_relpath == '.':
526 # Exclude files/dirs by full path matching pattern
527 if self.exclude_paths:
528 dirs[:] = [d for d in dirs
529 if not any(pathname_match(
530 os.path.join(root_relpath, d), pat)
531 for pat in self.exclude_paths)]
532 files = [f for f in files
533 if not any(pathname_match(
534 os.path.join(root_relpath, f), pat)
535 for pat in self.exclude_paths)]
536 # Exclude files/dirs by name matching pattern
537 if self.exclude_names is not None:
538 dirs[:] = [d for d in dirs
539 if not self.exclude_names.match(d)]
540 files = [f for f in files
541 if not self.exclude_names.match(f)]
542 # Make os.walk()'s dir traversing order deterministic
546 filepath = os.path.join(root, f)
547 # Add its size to the total bytes count (if applicable)
548 if self.follow_links or (not os.path.islink(filepath)):
549 if self.bytes_expected is not None:
550 self.bytes_expected += os.path.getsize(filepath)
551 self._check_file(filepath,
552 os.path.join(root[len(prefixdir):], f))
554 filepath = os.path.abspath(path)
555 # Add its size to the total bytes count (if applicable)
556 if self.follow_links or (not os.path.islink(filepath)):
557 if self.bytes_expected is not None:
558 self.bytes_expected += os.path.getsize(filepath)
559 self._check_file(filepath,
560 self.filename or os.path.basename(path))
561 # If dry-mode is on, and got up to this point, then we should notify that
562 # there aren't any file to upload.
564 raise ArvPutUploadNotPending()
565 # Remove local_collection's files that don't exist locally anymore, so the
566 # bytes_written count is correct.
567 for f in self.collection_file_paths(self._local_collection,
569 if f != 'stdin' and f != self.filename and not f in self._file_paths:
570 self._local_collection.remove(f)
572 def start(self, save_collection):
574 Start supporting thread & file uploading
576 self._checkpointer.start()
578 # Update bytes_written from current local collection and
579 # report initial progress.
582 self._upload_started = True # Used by the update thread to start checkpointing
584 except (SystemExit, Exception) as e:
585 self._checkpoint_before_quit = False
586 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
587 # Note: We're expecting SystemExit instead of
588 # KeyboardInterrupt because we have a custom signal
589 # handler in place that raises SystemExit with the catched
591 if isinstance(e, PathDoesNotExistError):
592 # We aren't interested in the traceback for this case
594 elif not isinstance(e, SystemExit) or e.code != -2:
595 self.logger.warning("Abnormal termination:\n{}".format(
596 traceback.format_exc()))
600 # Stop the thread before doing anything else
601 self._stop_checkpointer.set()
602 self._checkpointer.join()
603 if self._checkpoint_before_quit:
604 # Commit all pending blocks & one last _update()
605 self._local_collection.manifest_text()
606 self._update(final=True)
608 self.save_collection()
610 self._cache_file.close()
612 def save_collection(self):
614 # Check if files should be updated on the remote collection.
615 for fp in self._file_paths:
616 remote_file = self._remote_collection.find(fp)
618 # File don't exist on remote collection, copy it.
619 self._remote_collection.copy(fp, fp, self._local_collection)
620 elif remote_file != self._local_collection.find(fp):
621 # A different file exist on remote collection, overwrite it.
622 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
624 # The file already exist on remote collection, skip it.
626 self._remote_collection.save(storage_classes=self.storage_classes,
627 num_retries=self.num_retries)
629 if self.storage_classes is None:
630 self.storage_classes = ['default']
631 self._local_collection.save_new(
632 name=self.name, owner_uuid=self.owner_uuid,
633 storage_classes=self.storage_classes,
634 ensure_unique_name=self.ensure_unique_name,
635 num_retries=self.num_retries)
637 def destroy_cache(self):
640 os.unlink(self._cache_filename)
641 except OSError as error:
642 # That's what we wanted anyway.
643 if error.errno != errno.ENOENT:
645 self._cache_file.close()
647 def _collection_size(self, collection):
649 Recursively get the total size of the collection
652 for item in listvalues(collection):
653 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
654 size += self._collection_size(item)
659 def _update_task(self):
661 Periodically called support task. File uploading is
662 asynchronous so we poll status from the collection.
664 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
667 def _update(self, final=False):
669 Update cached manifest text and report progress.
671 if self._upload_started:
672 with self._collection_lock:
673 self.bytes_written = self._collection_size(self._local_collection)
676 manifest = self._local_collection.manifest_text()
678 # Get the manifest text without comitting pending blocks
679 manifest = self._local_collection.manifest_text(strip=False,
683 with self._state_lock:
684 self._state['manifest'] = manifest
688 except Exception as e:
689 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
691 self.bytes_written = self.bytes_skipped
692 # Call the reporter, if any
693 self.report_progress()
695 def report_progress(self):
696 if self.reporter is not None:
697 self.reporter(self.bytes_written, self.bytes_expected)
699 def _write_stdin(self, filename):
700 output = self._local_collection.open(filename, 'wb')
701 self._write(sys.stdin, output)
704 def _check_file(self, source, filename):
706 Check if this file needs to be uploaded
708 # Ignore symlinks when requested
709 if (not self.follow_links) and os.path.islink(source):
712 should_upload = False
713 new_file_in_cache = False
714 # Record file path for updating the remote collection before exiting
715 self._file_paths.add(filename)
717 with self._state_lock:
718 # If no previous cached data on this file, store it for an eventual
720 if source not in self._state['files']:
721 self._state['files'][source] = {
722 'mtime': os.path.getmtime(source),
723 'size' : os.path.getsize(source)
725 new_file_in_cache = True
726 cached_file_data = self._state['files'][source]
728 # Check if file was already uploaded (at least partially)
729 file_in_local_collection = self._local_collection.find(filename)
731 # If not resuming, upload the full file.
734 # New file detected from last run, upload it.
735 elif new_file_in_cache:
737 # Local file didn't change from last run.
738 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
739 if not file_in_local_collection:
740 # File not uploaded yet, upload it completely
742 elif file_in_local_collection.permission_expired():
743 # Permission token expired, re-upload file. This will change whenever
744 # we have a API for refreshing tokens.
745 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
747 self._local_collection.remove(filename)
748 elif cached_file_data['size'] == file_in_local_collection.size():
749 # File already there, skip it.
750 self.bytes_skipped += cached_file_data['size']
751 elif cached_file_data['size'] > file_in_local_collection.size():
752 # File partially uploaded, resume!
753 resume_offset = file_in_local_collection.size()
754 self.bytes_skipped += resume_offset
757 # Inconsistent cache, re-upload the file
759 self._local_collection.remove(filename)
760 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
761 # Local file differs from cached data, re-upload it.
763 if file_in_local_collection:
764 self._local_collection.remove(filename)
769 self._files_to_upload.append((source, resume_offset, filename))
770 except ArvPutUploadIsPending:
771 # This could happen when running on dry-mode, close cache file to
772 # avoid locking issues.
773 self._cache_file.close()
776 def _upload_files(self):
777 for source, resume_offset, filename in self._files_to_upload:
778 with open(source, 'rb') as source_fd:
779 with self._state_lock:
780 self._state['files'][source]['mtime'] = os.path.getmtime(source)
781 self._state['files'][source]['size'] = os.path.getsize(source)
782 if resume_offset > 0:
783 # Start upload where we left off
784 output = self._local_collection.open(filename, 'ab')
785 source_fd.seek(resume_offset)
788 output = self._local_collection.open(filename, 'wb')
789 self._write(source_fd, output)
790 output.close(flush=False)
792 def _write(self, source_fd, output):
794 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
799 def _my_collection(self):
800 return self._remote_collection if self.update else self._local_collection
802 def _get_cache_filepath(self):
803 # Set up cache file name from input paths.
805 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
806 realpaths = sorted(os.path.realpath(path) for path in self.paths)
807 md5.update(b'\0'.join([p.encode() for p in realpaths]))
809 md5.update(self.filename.encode())
810 cache_filename = md5.hexdigest()
811 cache_filepath = os.path.join(
812 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
814 return cache_filepath
816 def _setup_state(self, update_collection):
818 Create a new cache file or load a previously existing one.
820 # Load an already existing collection for update
821 if update_collection and re.match(arvados.util.collection_uuid_pattern,
824 self._remote_collection = arvados.collection.Collection(
825 update_collection, api_client=self._api_client)
826 except arvados.errors.ApiError as error:
827 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
830 elif update_collection:
831 # Collection locator provided, but unknown format
832 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
835 cache_filepath = self._get_cache_filepath()
836 if self.resume and os.path.exists(cache_filepath):
837 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
838 self._cache_file = open(cache_filepath, 'a+')
840 # --no-resume means start with a empty cache file.
841 self.logger.info("Creating new cache file at {}".format(cache_filepath))
842 self._cache_file = open(cache_filepath, 'w+')
843 self._cache_filename = self._cache_file.name
844 self._lock_file(self._cache_file)
845 self._cache_file.seek(0)
847 with self._state_lock:
850 self._state = json.load(self._cache_file)
851 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
852 # Cache at least partially incomplete, set up new cache
853 self._state = copy.deepcopy(self.EMPTY_STATE)
855 # Cache file empty, set up new cache
856 self._state = copy.deepcopy(self.EMPTY_STATE)
858 self.logger.info("No cache usage requested for this run.")
859 # No cache file, set empty state
860 self._state = copy.deepcopy(self.EMPTY_STATE)
861 if not self._cached_manifest_valid():
862 raise ResumeCacheInvalidError()
863 # Load the previous manifest so we can check if files were modified remotely.
864 self._local_collection = arvados.collection.Collection(
865 self._state['manifest'],
866 replication_desired=self.replication_desired,
867 put_threads=self.put_threads,
868 api_client=self._api_client)
870 def _cached_manifest_valid(self):
872 Validate the oldest non-expired block signature to check if cached manifest
873 is usable: checking if the cached manifest was not created with a different
876 if self._state.get('manifest', None) is None:
877 # No cached manifest yet, all good.
879 now = datetime.datetime.utcnow()
883 for m in keep_locator_pattern.finditer(self._state['manifest']):
886 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
888 # Locator without signature
891 if exp > now and (oldest_exp is None or exp < oldest_exp):
895 # No block signatures found => no invalid block signatures.
897 if oldest_loc is None:
898 # Locator signatures found, but all have expired.
899 # Reset the cache and move on.
900 self.logger.info('Cache expired, starting from scratch.')
901 self._state['manifest'] = ''
903 kc = arvados.KeepClient(api_client=self._api_client,
904 num_retries=self.num_retries)
907 except arvados.errors.KeepRequestError:
908 # Something is wrong, cached manifest is not valid.
912 def collection_file_paths(self, col, path_prefix='.'):
913 """Return a list of file paths by recursively go through the entire collection `col`"""
915 for name, item in listitems(col):
916 if isinstance(item, arvados.arvfile.ArvadosFile):
917 file_paths.append(os.path.join(path_prefix, name))
918 elif isinstance(item, arvados.collection.Subcollection):
919 new_prefix = os.path.join(path_prefix, name)
920 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
923 def _lock_file(self, fileobj):
925 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
927 raise ResumeCacheConflict("{} locked".format(fileobj.name))
929 def _save_state(self):
931 Atomically save current state into cache.
933 with self._state_lock:
934 # We're not using copy.deepcopy() here because it's a lot slower
935 # than json.dumps(), and we're already needing JSON format to be
937 state = json.dumps(self._state)
939 new_cache = tempfile.NamedTemporaryFile(
941 dir=os.path.dirname(self._cache_filename), delete=False)
942 self._lock_file(new_cache)
943 new_cache.write(state)
946 os.rename(new_cache.name, self._cache_filename)
947 except (IOError, OSError, ResumeCacheConflict) as error:
948 self.logger.error("There was a problem while saving the cache file: {}".format(error))
950 os.unlink(new_cache_name)
951 except NameError: # mkstemp failed.
954 self._cache_file.close()
955 self._cache_file = new_cache
957 def collection_name(self):
958 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
960 def manifest_locator(self):
961 return self._my_collection().manifest_locator()
963 def portable_data_hash(self):
964 pdh = self._my_collection().portable_data_hash()
965 m = self._my_collection().stripped_manifest().encode()
966 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
968 self.logger.warning("\n".join([
969 "arv-put: API server provided PDH differs from local manifest.",
970 " This should not happen; showing API server version."]))
973 def manifest_text(self, stream_name=".", strip=False, normalize=False):
974 return self._my_collection().manifest_text(stream_name, strip, normalize)
976 def _datablocks_on_item(self, item):
978 Return a list of datablock locators, recursively navigating
979 through subcollections
981 if isinstance(item, arvados.arvfile.ArvadosFile):
984 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
987 for segment in item.segments():
988 loc = segment.locator
991 elif isinstance(item, arvados.collection.Collection):
992 l = [self._datablocks_on_item(x) for x in listvalues(item)]
993 # Fast list flattener method taken from:
994 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
995 return [loc for sublist in l for loc in sublist]
999 def data_locators(self):
1000 with self._collection_lock:
1001 # Make sure all datablocks are flushed before getting the locators
1002 self._my_collection().manifest_text()
1003 datablocks = self._datablocks_on_item(self._my_collection())
1006 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1009 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1010 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1011 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1012 # so instead we're using it on every path component.
1013 def pathname_match(pathname, pattern):
1014 name = pathname.split(os.sep)
1015 # Fix patterns like 'some/subdir/' or 'some//subdir'
1016 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1017 if len(name) != len(pat):
1019 for i in range(len(name)):
1020 if not fnmatch.fnmatch(name[i], pat[i]):
1024 def machine_progress(bytes_written, bytes_expected):
1025 return _machine_format.format(
1026 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1028 def human_progress(bytes_written, bytes_expected):
1030 return "\r{}M / {}M {:.1%} ".format(
1031 bytes_written >> 20, bytes_expected >> 20,
1032 float(bytes_written) / bytes_expected)
1034 return "\r{} ".format(bytes_written)
1036 def progress_writer(progress_func, outfile=sys.stderr):
1037 def write_progress(bytes_written, bytes_expected):
1038 outfile.write(progress_func(bytes_written, bytes_expected))
1039 return write_progress
1041 def desired_project_uuid(api_client, project_uuid, num_retries):
1042 if not project_uuid:
1043 query = api_client.users().current()
1044 elif arvados.util.user_uuid_pattern.match(project_uuid):
1045 query = api_client.users().get(uuid=project_uuid)
1046 elif arvados.util.group_uuid_pattern.match(project_uuid):
1047 query = api_client.groups().get(uuid=project_uuid)
1049 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1050 return query.execute(num_retries=num_retries)['uuid']
1052 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1053 install_sig_handlers=True):
1056 args = parse_arguments(arguments)
1057 logger = logging.getLogger('arvados.arv_put')
1059 logger.setLevel(logging.WARNING)
1061 logger.setLevel(logging.INFO)
1064 request_id = arvados.util.new_request_id()
1066 formatter = ArvPutLogFormatter(request_id)
1067 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1069 if api_client is None:
1070 api_client = arvados.api('v1', request_id=request_id)
1072 if install_sig_handlers:
1073 arv_cmd.install_signal_handlers()
1075 # Determine the name to use
1077 if args.stream or args.raw:
1078 logger.error("Cannot use --name with --stream or --raw")
1080 elif args.update_collection:
1081 logger.error("Cannot use --name with --update-collection")
1083 collection_name = args.name
1085 collection_name = "Saved at {} by {}@{}".format(
1086 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1087 pwd.getpwuid(os.getuid()).pw_name,
1088 socket.gethostname())
1090 if args.project_uuid and (args.stream or args.raw):
1091 logger.error("Cannot use --project-uuid with --stream or --raw")
1094 # Determine the parent project
1096 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1098 except (apiclient_errors.Error, ValueError) as error:
1103 reporter = progress_writer(human_progress)
1104 elif args.batch_progress:
1105 reporter = progress_writer(machine_progress)
1109 # Split storage-classes argument
1110 storage_classes = None
1111 if args.storage_classes:
1112 storage_classes = args.storage_classes.strip().split(',')
1113 if len(storage_classes) > 1:
1114 logger.error("Multiple storage classes are not supported currently.")
1118 # Setup exclude regex from all the --exclude arguments provided
1121 exclude_names = None
1122 if len(args.exclude) > 0:
1123 # We're supporting 2 kinds of exclusion patterns:
1124 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1125 # the name, wherever the file is on the tree)
1126 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1127 # entire path, and should be relative to
1128 # any input dir argument)
1129 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1130 # placed directly underneath the input dir)
1131 for p in args.exclude:
1132 # Only relative paths patterns allowed
1133 if p.startswith(os.sep):
1134 logger.error("Cannot use absolute paths with --exclude")
1136 if os.path.dirname(p):
1137 # We don't support of path patterns with '..'
1138 p_parts = p.split(os.sep)
1141 "Cannot use path patterns that include or '..'")
1143 # Path search pattern
1144 exclude_paths.append(p)
1146 # Name-only search pattern
1147 name_patterns.append(p)
1148 # For name only matching, we can combine all patterns into a single
1149 # regexp, for better performance.
1150 exclude_names = re.compile('|'.join(
1151 [fnmatch.translate(p) for p in name_patterns]
1152 )) if len(name_patterns) > 0 else None
1153 # Show the user the patterns to be used, just in case they weren't
1154 # specified inside quotes and got changed by the shell expansion.
1155 logger.info("Exclude patterns: {}".format(args.exclude))
1157 # If this is used by a human, and there's at least one directory to be
1158 # uploaded, the expected bytes calculation can take a moment.
1159 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1160 logger.info("Calculating upload size, this could take some time...")
1162 writer = ArvPutUploadJob(paths = args.paths,
1163 resume = args.resume,
1164 use_cache = args.use_cache,
1165 filename = args.filename,
1166 reporter = reporter,
1167 api_client = api_client,
1168 num_retries = args.retries,
1169 replication_desired = args.replication,
1170 put_threads = args.threads,
1171 name = collection_name,
1172 owner_uuid = project_uuid,
1173 ensure_unique_name = True,
1174 update_collection = args.update_collection,
1175 storage_classes=storage_classes,
1177 dry_run=args.dry_run,
1178 follow_links=args.follow_links,
1179 exclude_paths=exclude_paths,
1180 exclude_names=exclude_names)
1181 except ResumeCacheConflict:
1182 logger.error("\n".join([
1183 "arv-put: Another process is already uploading this data.",
1184 " Use --no-cache if this is really what you want."]))
1186 except ResumeCacheInvalidError:
1187 logger.error("\n".join([
1188 "arv-put: Resume cache contains invalid signature: it may have expired",
1189 " or been created with another Arvados user's credentials.",
1190 " Switch user or use one of the following options to restart upload:",
1191 " --no-resume to start a new resume cache.",
1192 " --no-cache to disable resume cache."]))
1194 except CollectionUpdateError as error:
1195 logger.error("\n".join([
1196 "arv-put: %s" % str(error)]))
1198 except ArvPutUploadIsPending:
1199 # Dry run check successful, return proper exit code.
1201 except ArvPutUploadNotPending:
1202 # No files pending for upload
1204 except PathDoesNotExistError as error:
1205 logger.error("\n".join([
1206 "arv-put: %s" % str(error)]))
1209 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1210 logger.warning("\n".join([
1211 "arv-put: Resuming previous upload from last checkpoint.",
1212 " Use the --no-resume option to start over."]))
1214 if not args.dry_run:
1215 writer.report_progress()
1218 writer.start(save_collection=not(args.stream or args.raw))
1219 except arvados.errors.ApiError as error:
1220 logger.error("\n".join([
1221 "arv-put: %s" % str(error)]))
1224 if args.progress: # Print newline to split stderr from stdout for humans.
1229 output = writer.manifest_text(normalize=True)
1231 output = writer.manifest_text()
1233 output = ','.join(writer.data_locators())
1236 if args.update_collection:
1237 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1239 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1240 if args.portable_data_hash:
1241 output = writer.portable_data_hash()
1243 output = writer.manifest_locator()
1244 except apiclient_errors.Error as error:
1246 "arv-put: Error creating Collection on project: {}.".format(
1250 # Print the locator (uuid) of the new collection.
1252 status = status or 1
1253 elif not args.silent:
1254 stdout.write(output)
1255 if not output.endswith('\n'):
1258 if install_sig_handlers:
1259 arv_cmd.restore_signal_handlers()
1268 if __name__ == '__main__':