1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34 from arvados.util import keep_locator_pattern
36 import arvados.commands._util as arv_cmd
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
153 On high latency installations, using a greater number will improve
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158 action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171 dest='follow_links', help="""
172 Follow file and directory symlinks (default).
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176 Do not follow file and directory symlinks.
180 run_opts = argparse.ArgumentParser(add_help=False)
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
199 _group.add_argument('--no-progress', action='store_true',
201 Do not display human-readable progress on stderr, even if stderr is a
205 _group.add_argument('--batch-progress', action='store_true',
207 Display machine-readable progress on stderr (bytes and, if known,
211 run_opts.add_argument('--silent', action='store_true',
213 Do not print any debug messages to console. (Any error messages will
217 _group = run_opts.add_mutually_exclusive_group()
218 _group.add_argument('--resume', action='store_true', default=True,
220 Continue interrupted uploads from cached state (default).
222 _group.add_argument('--no-resume', action='store_false', dest='resume',
224 Do not continue interrupted uploads from cached state.
227 _group = run_opts.add_mutually_exclusive_group()
228 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
230 Save upload state in a cache file for resuming (default).
232 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
234 Do not save upload state in a cache file for resuming.
237 arg_parser = argparse.ArgumentParser(
238 description='Copy data from the local filesystem to Keep.',
239 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
241 def parse_arguments(arguments):
242 args = arg_parser.parse_args(arguments)
244 if len(args.paths) == 0:
247 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
249 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
252 --filename argument cannot be used when storing a directory or
256 # Turn on --progress by default if stderr is a tty.
257 if (not (args.batch_progress or args.no_progress or args.silent)
258 and os.isatty(sys.stderr.fileno())):
261 # Turn off --resume (default) if --no-cache is used.
262 if not args.use_cache:
265 if args.paths == ['-']:
266 if args.update_collection:
268 --update-collection cannot be used when reading from stdin.
271 args.use_cache = False
272 if not args.filename:
273 args.filename = 'stdin'
275 # Remove possible duplicated patterns
276 if len(args.exclude) > 0:
277 args.exclude = list(set(args.exclude))
282 class PathDoesNotExistError(Exception):
286 class CollectionUpdateError(Exception):
290 class ResumeCacheConflict(Exception):
294 class ResumeCacheInvalidError(Exception):
297 class ArvPutArgumentConflict(Exception):
301 class ArvPutUploadIsPending(Exception):
305 class ArvPutUploadNotPending(Exception):
309 class FileUploadList(list):
310 def __init__(self, dry_run=False):
312 self.dry_run = dry_run
314 def append(self, other):
316 raise ArvPutUploadIsPending()
317 super(FileUploadList, self).append(other)
320 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
321 class ArvPutLogFormatter(logging.Formatter):
322 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
324 request_id_informed = False
326 def __init__(self, request_id):
327 self.err_fmtr = logging.Formatter(
328 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
329 arvados.log_date_format)
331 def format(self, record):
332 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
333 self.request_id_informed = True
334 return self.err_fmtr.format(record)
335 return self.std_fmtr.format(record)
338 class ResumeCache(object):
339 CACHE_DIR = '.cache/arvados/arv-put'
341 def __init__(self, file_spec):
342 self.cache_file = open(file_spec, 'a+')
343 self._lock_file(self.cache_file)
344 self.filename = self.cache_file.name
347 def make_path(cls, args):
349 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
350 realpaths = sorted(os.path.realpath(path) for path in args.paths)
351 md5.update(b'\0'.join([p.encode() for p in realpaths]))
352 if any(os.path.isdir(path) for path in realpaths):
355 md5.update(args.filename.encode())
357 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
360 def _lock_file(self, fileobj):
362 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
364 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
367 self.cache_file.seek(0)
368 return json.load(self.cache_file)
370 def check_cache(self, api_client=None, num_retries=0):
375 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
376 locator = state["_finished_streams"][0][1][0]
377 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
378 locator = state["_current_stream_locators"][0]
379 if locator is not None:
380 kc = arvados.keep.KeepClient(api_client=api_client)
381 kc.head(locator, num_retries=num_retries)
382 except Exception as e:
387 def save(self, data):
389 new_cache_fd, new_cache_name = tempfile.mkstemp(
390 dir=os.path.dirname(self.filename))
391 self._lock_file(new_cache_fd)
392 new_cache = os.fdopen(new_cache_fd, 'r+')
393 json.dump(data, new_cache)
394 os.rename(new_cache_name, self.filename)
395 except (IOError, OSError, ResumeCacheConflict):
397 os.unlink(new_cache_name)
398 except NameError: # mkstemp failed.
401 self.cache_file.close()
402 self.cache_file = new_cache
405 self.cache_file.close()
409 os.unlink(self.filename)
410 except OSError as error:
411 if error.errno != errno.ENOENT: # That's what we wanted anyway.
417 self.__init__(self.filename)
420 class ArvPutUploadJob(object):
421 CACHE_DIR = '.cache/arvados/arv-put'
423 'manifest' : None, # Last saved manifest checkpoint
424 'files' : {} # Previous run file list: {path : {size, mtime}}
427 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
428 name=None, owner_uuid=None, api_client=None,
429 ensure_unique_name=False, num_retries=None,
430 put_threads=None, replication_desired=None, filename=None,
431 update_time=60.0, update_collection=None, storage_classes=None,
432 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
433 follow_links=True, exclude_paths=[], exclude_names=None):
436 self.use_cache = use_cache
438 self.reporter = reporter
439 # This will set to 0 before start counting, if no special files are going
441 self.bytes_expected = None
442 self.bytes_written = 0
443 self.bytes_skipped = 0
445 self.owner_uuid = owner_uuid
446 self.ensure_unique_name = ensure_unique_name
447 self.num_retries = num_retries
448 self.replication_desired = replication_desired
449 self.put_threads = put_threads
450 self.filename = filename
451 self.storage_classes = storage_classes
452 self._api_client = api_client
453 self._state_lock = threading.Lock()
454 self._state = None # Previous run state (file list & manifest)
455 self._current_files = [] # Current run file list
456 self._cache_file = None
457 self._collection_lock = threading.Lock()
458 self._remote_collection = None # Collection being updated (if asked)
459 self._local_collection = None # Collection from previous run manifest
460 self._file_paths = set() # Files to be updated in remote collection
461 self._stop_checkpointer = threading.Event()
462 self._checkpointer = threading.Thread(target=self._update_task)
463 self._checkpointer.daemon = True
464 self._update_task_time = update_time # How many seconds wait between update runs
465 self._files_to_upload = FileUploadList(dry_run=dry_run)
466 self._upload_started = False
468 self.dry_run = dry_run
469 self._checkpoint_before_quit = True
470 self.follow_links = follow_links
471 self.exclude_paths = exclude_paths
472 self.exclude_names = exclude_names
474 if not self.use_cache and self.resume:
475 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
477 # Check for obvious dry-run responses
478 if self.dry_run and (not self.use_cache or not self.resume):
479 raise ArvPutUploadIsPending()
481 # Load cached data if any and if needed
482 self._setup_state(update_collection)
484 # Build the upload file list, excluding requested files and counting the
485 # bytes expected to be uploaded.
486 self._build_upload_list()
488 def _build_upload_list(self):
490 Scan the requested paths to count file sizes, excluding requested files
491 and dirs and building the upload file list.
493 # If there aren't special files to be read, reset total bytes count to zero
495 if not any([p for p in self.paths
496 if not (os.path.isfile(p) or os.path.isdir(p))]):
497 self.bytes_expected = 0
499 for path in self.paths:
500 # Test for stdin first, in case some file named '-' exist
503 raise ArvPutUploadIsPending()
504 self._write_stdin(self.filename or 'stdin')
505 elif not os.path.exists(path):
506 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
507 elif os.path.isdir(path):
508 # Use absolute paths on cache index so CWD doesn't interfere
509 # with the caching logic.
511 path = os.path.abspath(path)
512 if orig_path[-1:] == os.sep:
513 # When passing a directory reference with a trailing slash,
514 # its contents should be uploaded directly to the
518 # When passing a directory reference with no trailing slash,
519 # upload the directory to the collection's root.
520 prefixdir = os.path.dirname(path)
522 for root, dirs, files in os.walk(path,
523 followlinks=self.follow_links):
524 root_relpath = os.path.relpath(root, path)
525 if root_relpath == '.':
527 # Exclude files/dirs by full path matching pattern
528 if self.exclude_paths:
529 dirs[:] = [d for d in dirs
530 if not any(pathname_match(
531 os.path.join(root_relpath, d), pat)
532 for pat in self.exclude_paths)]
533 files = [f for f in files
534 if not any(pathname_match(
535 os.path.join(root_relpath, f), pat)
536 for pat in self.exclude_paths)]
537 # Exclude files/dirs by name matching pattern
538 if self.exclude_names is not None:
539 dirs[:] = [d for d in dirs
540 if not self.exclude_names.match(d)]
541 files = [f for f in files
542 if not self.exclude_names.match(f)]
543 # Make os.walk()'s dir traversing order deterministic
547 filepath = os.path.join(root, f)
548 # Add its size to the total bytes count (if applicable)
549 if self.follow_links or (not os.path.islink(filepath)):
550 if self.bytes_expected is not None:
551 self.bytes_expected += os.path.getsize(filepath)
552 self._check_file(filepath,
553 os.path.join(root[len(prefixdir):], f))
555 filepath = os.path.abspath(path)
556 # Add its size to the total bytes count (if applicable)
557 if self.follow_links or (not os.path.islink(filepath)):
558 if self.bytes_expected is not None:
559 self.bytes_expected += os.path.getsize(filepath)
560 self._check_file(filepath,
561 self.filename or os.path.basename(path))
562 # If dry-mode is on, and got up to this point, then we should notify that
563 # there aren't any file to upload.
565 raise ArvPutUploadNotPending()
566 # Remove local_collection's files that don't exist locally anymore, so the
567 # bytes_written count is correct.
568 for f in self.collection_file_paths(self._local_collection,
570 if f != 'stdin' and f != self.filename and not f in self._file_paths:
571 self._local_collection.remove(f)
573 def start(self, save_collection):
575 Start supporting thread & file uploading
577 self._checkpointer.start()
579 # Update bytes_written from current local collection and
580 # report initial progress.
583 self._upload_started = True # Used by the update thread to start checkpointing
585 except (SystemExit, Exception) as e:
586 self._checkpoint_before_quit = False
587 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
588 # Note: We're expecting SystemExit instead of
589 # KeyboardInterrupt because we have a custom signal
590 # handler in place that raises SystemExit with the catched
592 if isinstance(e, PathDoesNotExistError):
593 # We aren't interested in the traceback for this case
595 elif not isinstance(e, SystemExit) or e.code != -2:
596 self.logger.warning("Abnormal termination:\n{}".format(
597 traceback.format_exc()))
601 # Stop the thread before doing anything else
602 self._stop_checkpointer.set()
603 self._checkpointer.join()
604 if self._checkpoint_before_quit:
605 # Commit all pending blocks & one last _update()
606 self._local_collection.manifest_text()
607 self._update(final=True)
609 self.save_collection()
611 self._cache_file.close()
613 def save_collection(self):
615 # Check if files should be updated on the remote collection.
616 for fp in self._file_paths:
617 remote_file = self._remote_collection.find(fp)
619 # File don't exist on remote collection, copy it.
620 self._remote_collection.copy(fp, fp, self._local_collection)
621 elif remote_file != self._local_collection.find(fp):
622 # A different file exist on remote collection, overwrite it.
623 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
625 # The file already exist on remote collection, skip it.
627 self._remote_collection.save(storage_classes=self.storage_classes,
628 num_retries=self.num_retries)
630 if self.storage_classes is None:
631 self.storage_classes = ['default']
632 self._local_collection.save_new(
633 name=self.name, owner_uuid=self.owner_uuid,
634 storage_classes=self.storage_classes,
635 ensure_unique_name=self.ensure_unique_name,
636 num_retries=self.num_retries)
638 def destroy_cache(self):
641 os.unlink(self._cache_filename)
642 except OSError as error:
643 # That's what we wanted anyway.
644 if error.errno != errno.ENOENT:
646 self._cache_file.close()
648 def _collection_size(self, collection):
650 Recursively get the total size of the collection
653 for item in listvalues(collection):
654 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
655 size += self._collection_size(item)
660 def _update_task(self):
662 Periodically called support task. File uploading is
663 asynchronous so we poll status from the collection.
665 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
668 def _update(self, final=False):
670 Update cached manifest text and report progress.
672 if self._upload_started:
673 with self._collection_lock:
674 self.bytes_written = self._collection_size(self._local_collection)
677 manifest = self._local_collection.manifest_text()
679 # Get the manifest text without comitting pending blocks
680 manifest = self._local_collection.manifest_text(strip=False,
684 with self._state_lock:
685 self._state['manifest'] = manifest
689 except Exception as e:
690 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
692 self.bytes_written = self.bytes_skipped
693 # Call the reporter, if any
694 self.report_progress()
696 def report_progress(self):
697 if self.reporter is not None:
698 self.reporter(self.bytes_written, self.bytes_expected)
700 def _write_stdin(self, filename):
701 output = self._local_collection.open(filename, 'wb')
702 self._write(sys.stdin, output)
705 def _check_file(self, source, filename):
707 Check if this file needs to be uploaded
709 # Ignore symlinks when requested
710 if (not self.follow_links) and os.path.islink(source):
713 should_upload = False
714 new_file_in_cache = False
715 # Record file path for updating the remote collection before exiting
716 self._file_paths.add(filename)
718 with self._state_lock:
719 # If no previous cached data on this file, store it for an eventual
721 if source not in self._state['files']:
722 self._state['files'][source] = {
723 'mtime': os.path.getmtime(source),
724 'size' : os.path.getsize(source)
726 new_file_in_cache = True
727 cached_file_data = self._state['files'][source]
729 # Check if file was already uploaded (at least partially)
730 file_in_local_collection = self._local_collection.find(filename)
732 # If not resuming, upload the full file.
735 # New file detected from last run, upload it.
736 elif new_file_in_cache:
738 # Local file didn't change from last run.
739 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
740 if not file_in_local_collection:
741 # File not uploaded yet, upload it completely
743 elif file_in_local_collection.permission_expired():
744 # Permission token expired, re-upload file. This will change whenever
745 # we have a API for refreshing tokens.
746 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
748 self._local_collection.remove(filename)
749 elif cached_file_data['size'] == file_in_local_collection.size():
750 # File already there, skip it.
751 self.bytes_skipped += cached_file_data['size']
752 elif cached_file_data['size'] > file_in_local_collection.size():
753 # File partially uploaded, resume!
754 resume_offset = file_in_local_collection.size()
755 self.bytes_skipped += resume_offset
758 # Inconsistent cache, re-upload the file
760 self._local_collection.remove(filename)
761 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
762 # Local file differs from cached data, re-upload it.
764 if file_in_local_collection:
765 self._local_collection.remove(filename)
770 self._files_to_upload.append((source, resume_offset, filename))
771 except ArvPutUploadIsPending:
772 # This could happen when running on dry-mode, close cache file to
773 # avoid locking issues.
774 self._cache_file.close()
777 def _upload_files(self):
778 for source, resume_offset, filename in self._files_to_upload:
779 with open(source, 'rb') as source_fd:
780 with self._state_lock:
781 self._state['files'][source]['mtime'] = os.path.getmtime(source)
782 self._state['files'][source]['size'] = os.path.getsize(source)
783 if resume_offset > 0:
784 # Start upload where we left off
785 output = self._local_collection.open(filename, 'ab')
786 source_fd.seek(resume_offset)
789 output = self._local_collection.open(filename, 'wb')
790 self._write(source_fd, output)
791 output.close(flush=False)
793 def _write(self, source_fd, output):
795 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
800 def _my_collection(self):
801 return self._remote_collection if self.update else self._local_collection
803 def _get_cache_filepath(self):
804 # Set up cache file name from input paths.
806 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
807 realpaths = sorted(os.path.realpath(path) for path in self.paths)
808 md5.update(b'\0'.join([p.encode() for p in realpaths]))
810 md5.update(self.filename.encode())
811 cache_filename = md5.hexdigest()
812 cache_filepath = os.path.join(
813 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
815 return cache_filepath
817 def _setup_state(self, update_collection):
819 Create a new cache file or load a previously existing one.
821 # Load an already existing collection for update
822 if update_collection and re.match(arvados.util.collection_uuid_pattern,
825 self._remote_collection = arvados.collection.Collection(
826 update_collection, api_client=self._api_client)
827 except arvados.errors.ApiError as error:
828 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
831 elif update_collection:
832 # Collection locator provided, but unknown format
833 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
836 cache_filepath = self._get_cache_filepath()
837 if self.resume and os.path.exists(cache_filepath):
838 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
839 self._cache_file = open(cache_filepath, 'a+')
841 # --no-resume means start with a empty cache file.
842 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
843 self._cache_file = open(cache_filepath, 'w+')
844 self._cache_filename = self._cache_file.name
845 self._lock_file(self._cache_file)
846 self._cache_file.seek(0)
848 with self._state_lock:
851 self._state = json.load(self._cache_file)
852 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
853 # Cache at least partially incomplete, set up new cache
854 self._state = copy.deepcopy(self.EMPTY_STATE)
856 # Cache file empty, set up new cache
857 self._state = copy.deepcopy(self.EMPTY_STATE)
859 self.logger.info("No cache usage requested for this run.")
860 # No cache file, set empty state
861 self._state = copy.deepcopy(self.EMPTY_STATE)
862 if not self._cached_manifest_valid():
863 raise ResumeCacheInvalidError()
864 # Load the previous manifest so we can check if files were modified remotely.
865 self._local_collection = arvados.collection.Collection(
866 self._state['manifest'],
867 replication_desired=self.replication_desired,
868 put_threads=self.put_threads,
869 api_client=self._api_client)
871 def _cached_manifest_valid(self):
873 Validate the oldest non-expired block signature to check if cached manifest
874 is usable: checking if the cached manifest was not created with a different
877 if self._state.get('manifest', None) is None:
878 # No cached manifest yet, all good.
880 now = datetime.datetime.utcnow()
884 for m in keep_locator_pattern.finditer(self._state['manifest']):
887 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
889 # Locator without signature
892 if exp > now and (oldest_exp is None or exp < oldest_exp):
896 # No block signatures found => no invalid block signatures.
898 if oldest_loc is None:
899 # Locator signatures found, but all have expired.
900 # Reset the cache and move on.
901 self.logger.info('Cache expired, starting from scratch.')
902 self._state['manifest'] = ''
904 kc = arvados.KeepClient(api_client=self._api_client,
905 num_retries=self.num_retries)
908 except arvados.errors.KeepRequestError:
909 # Something is wrong, cached manifest is not valid.
913 def collection_file_paths(self, col, path_prefix='.'):
914 """Return a list of file paths by recursively go through the entire collection `col`"""
916 for name, item in listitems(col):
917 if isinstance(item, arvados.arvfile.ArvadosFile):
918 file_paths.append(os.path.join(path_prefix, name))
919 elif isinstance(item, arvados.collection.Subcollection):
920 new_prefix = os.path.join(path_prefix, name)
921 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
924 def _lock_file(self, fileobj):
926 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
928 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
930 def _save_state(self):
932 Atomically save current state into cache.
934 with self._state_lock:
935 # We're not using copy.deepcopy() here because it's a lot slower
936 # than json.dumps(), and we're already needing JSON format to be
938 state = json.dumps(self._state)
940 new_cache = tempfile.NamedTemporaryFile(
942 dir=os.path.dirname(self._cache_filename), delete=False)
943 self._lock_file(new_cache)
944 new_cache.write(state)
947 os.rename(new_cache.name, self._cache_filename)
948 except (IOError, OSError, ResumeCacheConflict) as error:
949 self.logger.error("There was a problem while saving the cache file: {}".format(error))
951 os.unlink(new_cache_name)
952 except NameError: # mkstemp failed.
955 self._cache_file.close()
956 self._cache_file = new_cache
958 def collection_name(self):
959 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
961 def manifest_locator(self):
962 return self._my_collection().manifest_locator()
964 def portable_data_hash(self):
965 pdh = self._my_collection().portable_data_hash()
966 m = self._my_collection().stripped_manifest().encode()
967 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
969 self.logger.warning("\n".join([
970 "arv-put: API server provided PDH differs from local manifest.",
971 " This should not happen; showing API server version."]))
974 def manifest_text(self, stream_name=".", strip=False, normalize=False):
975 return self._my_collection().manifest_text(stream_name, strip, normalize)
977 def _datablocks_on_item(self, item):
979 Return a list of datablock locators, recursively navigating
980 through subcollections
982 if isinstance(item, arvados.arvfile.ArvadosFile):
985 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
988 for segment in item.segments():
989 loc = segment.locator
992 elif isinstance(item, arvados.collection.Collection):
993 l = [self._datablocks_on_item(x) for x in listvalues(item)]
994 # Fast list flattener method taken from:
995 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
996 return [loc for sublist in l for loc in sublist]
1000 def data_locators(self):
1001 with self._collection_lock:
1002 # Make sure all datablocks are flushed before getting the locators
1003 self._my_collection().manifest_text()
1004 datablocks = self._datablocks_on_item(self._my_collection())
1007 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1010 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1011 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1012 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1013 # so instead we're using it on every path component.
1014 def pathname_match(pathname, pattern):
1015 name = pathname.split(os.sep)
1016 # Fix patterns like 'some/subdir/' or 'some//subdir'
1017 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1018 if len(name) != len(pat):
1020 for i in range(len(name)):
1021 if not fnmatch.fnmatch(name[i], pat[i]):
1025 def machine_progress(bytes_written, bytes_expected):
1026 return _machine_format.format(
1027 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1029 def human_progress(bytes_written, bytes_expected):
1031 return "\r{}M / {}M {:.1%} ".format(
1032 bytes_written >> 20, bytes_expected >> 20,
1033 float(bytes_written) / bytes_expected)
1035 return "\r{} ".format(bytes_written)
1037 def progress_writer(progress_func, outfile=sys.stderr):
1038 def write_progress(bytes_written, bytes_expected):
1039 outfile.write(progress_func(bytes_written, bytes_expected))
1040 return write_progress
1042 def desired_project_uuid(api_client, project_uuid, num_retries):
1043 if not project_uuid:
1044 query = api_client.users().current()
1045 elif arvados.util.user_uuid_pattern.match(project_uuid):
1046 query = api_client.users().get(uuid=project_uuid)
1047 elif arvados.util.group_uuid_pattern.match(project_uuid):
1048 query = api_client.groups().get(uuid=project_uuid)
1050 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1051 return query.execute(num_retries=num_retries)['uuid']
1053 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1054 install_sig_handlers=True):
1057 args = parse_arguments(arguments)
1058 logger = logging.getLogger('arvados.arv_put')
1060 logger.setLevel(logging.WARNING)
1062 logger.setLevel(logging.INFO)
1065 request_id = arvados.util.new_request_id()
1067 formatter = ArvPutLogFormatter(request_id)
1068 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1070 if api_client is None:
1071 api_client = arvados.api('v1', request_id=request_id)
1073 if install_sig_handlers:
1074 arv_cmd.install_signal_handlers()
1076 # Determine the name to use
1078 if args.stream or args.raw:
1079 logger.error("Cannot use --name with --stream or --raw")
1081 elif args.update_collection:
1082 logger.error("Cannot use --name with --update-collection")
1084 collection_name = args.name
1086 collection_name = "Saved at {} by {}@{}".format(
1087 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1088 pwd.getpwuid(os.getuid()).pw_name,
1089 socket.gethostname())
1091 if args.project_uuid and (args.stream or args.raw):
1092 logger.error("Cannot use --project-uuid with --stream or --raw")
1095 # Determine the parent project
1097 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1099 except (apiclient_errors.Error, ValueError) as error:
1104 reporter = progress_writer(human_progress)
1105 elif args.batch_progress:
1106 reporter = progress_writer(machine_progress)
1110 # Split storage-classes argument
1111 storage_classes = None
1112 if args.storage_classes:
1113 storage_classes = args.storage_classes.strip().split(',')
1114 if len(storage_classes) > 1:
1115 logger.error("Multiple storage classes are not supported currently.")
1119 # Setup exclude regex from all the --exclude arguments provided
1122 exclude_names = None
1123 if len(args.exclude) > 0:
1124 # We're supporting 2 kinds of exclusion patterns:
1125 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1126 # the name, wherever the file is on the tree)
1127 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1128 # entire path, and should be relative to
1129 # any input dir argument)
1130 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1131 # placed directly underneath the input dir)
1132 for p in args.exclude:
1133 # Only relative paths patterns allowed
1134 if p.startswith(os.sep):
1135 logger.error("Cannot use absolute paths with --exclude")
1137 if os.path.dirname(p):
1138 # We don't support of path patterns with '..'
1139 p_parts = p.split(os.sep)
1142 "Cannot use path patterns that include or '..'")
1144 # Path search pattern
1145 exclude_paths.append(p)
1147 # Name-only search pattern
1148 name_patterns.append(p)
1149 # For name only matching, we can combine all patterns into a single
1150 # regexp, for better performance.
1151 exclude_names = re.compile('|'.join(
1152 [fnmatch.translate(p) for p in name_patterns]
1153 )) if len(name_patterns) > 0 else None
1154 # Show the user the patterns to be used, just in case they weren't
1155 # specified inside quotes and got changed by the shell expansion.
1156 logger.info("Exclude patterns: {}".format(args.exclude))
1158 # If this is used by a human, and there's at least one directory to be
1159 # uploaded, the expected bytes calculation can take a moment.
1160 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1161 logger.info("Calculating upload size, this could take some time...")
1163 writer = ArvPutUploadJob(paths = args.paths,
1164 resume = args.resume,
1165 use_cache = args.use_cache,
1166 filename = args.filename,
1167 reporter = reporter,
1168 api_client = api_client,
1169 num_retries = args.retries,
1170 replication_desired = args.replication,
1171 put_threads = args.threads,
1172 name = collection_name,
1173 owner_uuid = project_uuid,
1174 ensure_unique_name = True,
1175 update_collection = args.update_collection,
1176 storage_classes=storage_classes,
1178 dry_run=args.dry_run,
1179 follow_links=args.follow_links,
1180 exclude_paths=exclude_paths,
1181 exclude_names=exclude_names)
1182 except ResumeCacheConflict:
1183 logger.error("\n".join([
1184 "arv-put: Another process is already uploading this data.",
1185 " Use --no-cache if this is really what you want."]))
1187 except ResumeCacheInvalidError:
1188 logger.error("\n".join([
1189 "arv-put: Resume cache contains invalid signature: it may have expired",
1190 " or been created with another Arvados user's credentials.",
1191 " Switch user or use one of the following options to restart upload:",
1192 " --no-resume to start a new resume cache.",
1193 " --no-cache to disable resume cache."]))
1195 except CollectionUpdateError as error:
1196 logger.error("\n".join([
1197 "arv-put: %s" % str(error)]))
1199 except ArvPutUploadIsPending:
1200 # Dry run check successful, return proper exit code.
1202 except ArvPutUploadNotPending:
1203 # No files pending for upload
1205 except PathDoesNotExistError as error:
1206 logger.error("\n".join([
1207 "arv-put: %s" % str(error)]))
1210 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1211 logger.warning("\n".join([
1212 "arv-put: Resuming previous upload from last checkpoint.",
1213 " Use the --no-resume option to start over."]))
1215 if not args.dry_run:
1216 writer.report_progress()
1219 writer.start(save_collection=not(args.stream or args.raw))
1220 except arvados.errors.ApiError as error:
1221 logger.error("\n".join([
1222 "arv-put: %s" % str(error)]))
1225 if args.progress: # Print newline to split stderr from stdout for humans.
1230 output = writer.manifest_text(normalize=True)
1232 output = writer.manifest_text()
1234 output = ','.join(writer.data_locators())
1237 if args.update_collection:
1238 logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
1240 logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
1241 if args.portable_data_hash:
1242 output = writer.portable_data_hash()
1244 output = writer.manifest_locator()
1245 except apiclient_errors.Error as error:
1247 "arv-put: Error creating Collection on project: {}.".format(
1251 # Print the locator (uuid) of the new collection.
1253 status = status or 1
1254 elif not args.silent:
1255 stdout.write(output)
1256 if not output.endswith('\n'):
1259 if install_sig_handlers:
1260 arv_cmd.restore_signal_handlers()
1269 if __name__ == '__main__':