1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
37 import arvados.commands._util as arv_cmd
41 upload_opts = argparse.ArgumentParser(add_help=False)
43 upload_opts.add_argument('--version', action='version',
44 version="%s %s" % (sys.argv[0], __version__),
45 help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
53 _group = upload_opts.add_mutually_exclusive_group()
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56 default=-1, help=argparse.SUPPRESS)
58 _group.add_argument('--normalize', action='store_true',
60 Normalize the manifest by re-ordering files and streams after writing
64 _group.add_argument('--dry-run', action='store_true', default=False,
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
70 _group = upload_opts.add_mutually_exclusive_group()
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
77 _group.add_argument('--stream', action='store_true',
79 Store the file content and display the resulting manifest on
80 stdout. Do not write the manifest to Keep or save a Collection object
84 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
86 Synonym for --manifest.
89 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
91 Synonym for --manifest.
94 _group.add_argument('--manifest', action='store_true',
96 Store the file data and resulting manifest in Keep, save a Collection
97 object in Arvados, and display the manifest locator (Collection uuid)
98 on stdout. This is the default behavior.
101 _group.add_argument('--as-raw', action='store_true', dest='raw',
106 _group.add_argument('--raw', action='store_true',
108 Store the file content and display the data block locators on stdout,
109 separated by commas, with a trailing newline. Do not store a
113 upload_opts.add_argument('--update-collection', type=str, default=None,
114 dest='update_collection', metavar="UUID", help="""
115 Update an existing collection identified by the given Arvados collection
116 UUID. All new local files will be uploaded.
119 upload_opts.add_argument('--use-filename', type=str, default=None,
120 dest='filename', help="""
121 Synonym for --filename.
124 upload_opts.add_argument('--filename', type=str, default=None,
126 Use the given filename in the manifest, instead of the name of the
127 local file. This is useful when "-" or "/dev/stdin" is given as an
128 input file. It can be used only if there is exactly one path given and
129 it is not a directory. Implies --manifest.
132 upload_opts.add_argument('--portable-data-hash', action='store_true',
134 Print the portable data hash instead of the Arvados UUID for the collection
135 created by the upload.
138 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
140 Set the replication level for the new collection: how many different
141 physical storage devices (e.g., disks) should have a copy of each data
142 block. Default is to use the server-provided default (if any) or 2.
145 upload_opts.add_argument('--storage-classes', help="""
146 Specify comma separated list of storage classes to be used when saving data to Keep.
149 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
151 Set the number of upload threads to be used. Take into account that
152 using lots of threads will increase the RAM requirements. Default is
154 On high latency installations, using a greater number will improve
158 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
159 action='append', help="""
160 Exclude files and directories whose names match the given glob pattern. When
161 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
162 directory, relative to the provided input dirs will be excluded.
163 When using a filename pattern like '*.txt', any text file will be excluded
164 no matter where it is placed.
165 For the special case of needing to exclude only files or dirs directly below
166 the given input directory, you can use a pattern like './exclude_this.gif'.
167 You can specify multiple patterns by using this argument more than once.
170 _group = upload_opts.add_mutually_exclusive_group()
171 _group.add_argument('--follow-links', action='store_true', default=True,
172 dest='follow_links', help="""
173 Follow file and directory symlinks (default).
175 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
177 Do not follow file and directory symlinks.
181 run_opts = argparse.ArgumentParser(add_help=False)
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
200 _group.add_argument('--no-progress', action='store_true',
202 Do not display human-readable progress on stderr, even if stderr is a
206 _group.add_argument('--batch-progress', action='store_true',
208 Display machine-readable progress on stderr (bytes and, if known,
212 run_opts.add_argument('--silent', action='store_true',
214 Do not print any debug messages to console. (Any error messages will
218 _group = run_opts.add_mutually_exclusive_group()
219 _group.add_argument('--resume', action='store_true', default=True,
221 Continue interrupted uploads from cached state (default).
223 _group.add_argument('--no-resume', action='store_false', dest='resume',
225 Do not continue interrupted uploads from cached state.
228 _group = run_opts.add_mutually_exclusive_group()
229 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
231 Save upload state in a cache file for resuming (default).
233 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
235 Do not save upload state in a cache file for resuming.
238 _group = upload_opts.add_mutually_exclusive_group()
239 _group.add_argument('--trash-at', metavar='YYYY-MM-DD HH:MM', default=None,
241 Set the trash date of the resulting collection to an absolute date in the future.
242 The accepted format is defined by the ISO 8601 standard.
244 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
246 Set the trash date of the resulting collection to an amount of days from the
247 date/time that the upload process finishes.
250 arg_parser = argparse.ArgumentParser(
251 description='Copy data from the local filesystem to Keep.',
252 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
254 def parse_arguments(arguments):
255 args = arg_parser.parse_args(arguments)
257 if len(args.paths) == 0:
260 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
262 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
265 --filename argument cannot be used when storing a directory or
269 # Turn on --progress by default if stderr is a tty.
270 if (not (args.batch_progress or args.no_progress or args.silent)
271 and os.isatty(sys.stderr.fileno())):
274 # Turn off --resume (default) if --no-cache is used.
275 if not args.use_cache:
278 if args.paths == ['-']:
279 if args.update_collection:
281 --update-collection cannot be used when reading from stdin.
284 args.use_cache = False
285 if not args.filename:
286 args.filename = 'stdin'
288 # Remove possible duplicated patterns
289 if len(args.exclude) > 0:
290 args.exclude = list(set(args.exclude))
295 class PathDoesNotExistError(Exception):
299 class CollectionUpdateError(Exception):
303 class ResumeCacheConflict(Exception):
307 class ResumeCacheInvalidError(Exception):
310 class ArvPutArgumentConflict(Exception):
314 class ArvPutUploadIsPending(Exception):
318 class ArvPutUploadNotPending(Exception):
322 class FileUploadList(list):
323 def __init__(self, dry_run=False):
325 self.dry_run = dry_run
327 def append(self, other):
329 raise ArvPutUploadIsPending()
330 super(FileUploadList, self).append(other)
333 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
334 class ArvPutLogFormatter(logging.Formatter):
335 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
337 request_id_informed = False
339 def __init__(self, request_id):
340 self.err_fmtr = logging.Formatter(
341 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
342 arvados.log_date_format)
344 def format(self, record):
345 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
346 self.request_id_informed = True
347 return self.err_fmtr.format(record)
348 return self.std_fmtr.format(record)
351 class ResumeCache(object):
352 CACHE_DIR = '.cache/arvados/arv-put'
354 def __init__(self, file_spec):
355 self.cache_file = open(file_spec, 'a+')
356 self._lock_file(self.cache_file)
357 self.filename = self.cache_file.name
360 def make_path(cls, args):
362 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
363 realpaths = sorted(os.path.realpath(path) for path in args.paths)
364 md5.update(b'\0'.join([p.encode() for p in realpaths]))
365 if any(os.path.isdir(path) for path in realpaths):
368 md5.update(args.filename.encode())
370 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
373 def _lock_file(self, fileobj):
375 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
377 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
380 self.cache_file.seek(0)
381 return json.load(self.cache_file)
383 def check_cache(self, api_client=None, num_retries=0):
388 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
389 locator = state["_finished_streams"][0][1][0]
390 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
391 locator = state["_current_stream_locators"][0]
392 if locator is not None:
393 kc = arvados.keep.KeepClient(api_client=api_client)
394 kc.head(locator, num_retries=num_retries)
395 except Exception as e:
400 def save(self, data):
402 new_cache_fd, new_cache_name = tempfile.mkstemp(
403 dir=os.path.dirname(self.filename))
404 self._lock_file(new_cache_fd)
405 new_cache = os.fdopen(new_cache_fd, 'r+')
406 json.dump(data, new_cache)
407 os.rename(new_cache_name, self.filename)
408 except (IOError, OSError, ResumeCacheConflict):
410 os.unlink(new_cache_name)
411 except NameError: # mkstemp failed.
414 self.cache_file.close()
415 self.cache_file = new_cache
418 self.cache_file.close()
422 os.unlink(self.filename)
423 except OSError as error:
424 if error.errno != errno.ENOENT: # That's what we wanted anyway.
430 self.__init__(self.filename)
433 class ArvPutUploadJob(object):
434 CACHE_DIR = '.cache/arvados/arv-put'
436 'manifest' : None, # Last saved manifest checkpoint
437 'files' : {} # Previous run file list: {path : {size, mtime}}
440 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
441 name=None, owner_uuid=None, api_client=None,
442 ensure_unique_name=False, num_retries=None,
443 put_threads=None, replication_desired=None, filename=None,
444 update_time=60.0, update_collection=None, storage_classes=None,
445 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
446 follow_links=True, exclude_paths=[], exclude_names=None,
450 self.use_cache = use_cache
452 self.reporter = reporter
453 # This will set to 0 before start counting, if no special files are going
455 self.bytes_expected = None
456 self.bytes_written = 0
457 self.bytes_skipped = 0
459 self.owner_uuid = owner_uuid
460 self.ensure_unique_name = ensure_unique_name
461 self.num_retries = num_retries
462 self.replication_desired = replication_desired
463 self.put_threads = put_threads
464 self.filename = filename
465 self.storage_classes = storage_classes
466 self._api_client = api_client
467 self._state_lock = threading.Lock()
468 self._state = None # Previous run state (file list & manifest)
469 self._current_files = [] # Current run file list
470 self._cache_file = None
471 self._collection_lock = threading.Lock()
472 self._remote_collection = None # Collection being updated (if asked)
473 self._local_collection = None # Collection from previous run manifest
474 self._file_paths = set() # Files to be updated in remote collection
475 self._stop_checkpointer = threading.Event()
476 self._checkpointer = threading.Thread(target=self._update_task)
477 self._checkpointer.daemon = True
478 self._update_task_time = update_time # How many seconds wait between update runs
479 self._files_to_upload = FileUploadList(dry_run=dry_run)
480 self._upload_started = False
482 self.dry_run = dry_run
483 self._checkpoint_before_quit = True
484 self.follow_links = follow_links
485 self.exclude_paths = exclude_paths
486 self.exclude_names = exclude_names
487 self._trash_at = trash_at
489 if self._trash_at is not None and type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
490 raise TypeError('trash_at should be None, datetime or timedelta')
492 if not self.use_cache and self.resume:
493 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
495 # Check for obvious dry-run responses
496 if self.dry_run and (not self.use_cache or not self.resume):
497 raise ArvPutUploadIsPending()
499 # Load cached data if any and if needed
500 self._setup_state(update_collection)
502 # Build the upload file list, excluding requested files and counting the
503 # bytes expected to be uploaded.
504 self._build_upload_list()
506 def _build_upload_list(self):
508 Scan the requested paths to count file sizes, excluding requested files
509 and dirs and building the upload file list.
511 # If there aren't special files to be read, reset total bytes count to zero
513 if not any([p for p in self.paths
514 if not (os.path.isfile(p) or os.path.isdir(p))]):
515 self.bytes_expected = 0
517 for path in self.paths:
518 # Test for stdin first, in case some file named '-' exist
521 raise ArvPutUploadIsPending()
522 self._write_stdin(self.filename or 'stdin')
523 elif not os.path.exists(path):
524 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
525 elif os.path.isdir(path):
526 # Use absolute paths on cache index so CWD doesn't interfere
527 # with the caching logic.
529 path = os.path.abspath(path)
530 if orig_path[-1:] == os.sep:
531 # When passing a directory reference with a trailing slash,
532 # its contents should be uploaded directly to the
536 # When passing a directory reference with no trailing slash,
537 # upload the directory to the collection's root.
538 prefixdir = os.path.dirname(path)
540 for root, dirs, files in os.walk(path,
541 followlinks=self.follow_links):
542 root_relpath = os.path.relpath(root, path)
543 if root_relpath == '.':
545 # Exclude files/dirs by full path matching pattern
546 if self.exclude_paths:
547 dirs[:] = [d for d in dirs
548 if not any(pathname_match(
549 os.path.join(root_relpath, d), pat)
550 for pat in self.exclude_paths)]
551 files = [f for f in files
552 if not any(pathname_match(
553 os.path.join(root_relpath, f), pat)
554 for pat in self.exclude_paths)]
555 # Exclude files/dirs by name matching pattern
556 if self.exclude_names is not None:
557 dirs[:] = [d for d in dirs
558 if not self.exclude_names.match(d)]
559 files = [f for f in files
560 if not self.exclude_names.match(f)]
561 # Make os.walk()'s dir traversing order deterministic
565 filepath = os.path.join(root, f)
566 # Add its size to the total bytes count (if applicable)
567 if self.follow_links or (not os.path.islink(filepath)):
568 if self.bytes_expected is not None:
569 self.bytes_expected += os.path.getsize(filepath)
570 self._check_file(filepath,
571 os.path.join(root[len(prefixdir):], f))
573 filepath = os.path.abspath(path)
574 # Add its size to the total bytes count (if applicable)
575 if self.follow_links or (not os.path.islink(filepath)):
576 if self.bytes_expected is not None:
577 self.bytes_expected += os.path.getsize(filepath)
578 self._check_file(filepath,
579 self.filename or os.path.basename(path))
580 # If dry-mode is on, and got up to this point, then we should notify that
581 # there aren't any file to upload.
583 raise ArvPutUploadNotPending()
584 # Remove local_collection's files that don't exist locally anymore, so the
585 # bytes_written count is correct.
586 for f in self.collection_file_paths(self._local_collection,
588 if f != 'stdin' and f != self.filename and not f in self._file_paths:
589 self._local_collection.remove(f)
591 def start(self, save_collection):
593 Start supporting thread & file uploading
595 self._checkpointer.start()
597 # Update bytes_written from current local collection and
598 # report initial progress.
601 self._upload_started = True # Used by the update thread to start checkpointing
603 except (SystemExit, Exception) as e:
604 self._checkpoint_before_quit = False
605 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
606 # Note: We're expecting SystemExit instead of
607 # KeyboardInterrupt because we have a custom signal
608 # handler in place that raises SystemExit with the catched
610 if isinstance(e, PathDoesNotExistError):
611 # We aren't interested in the traceback for this case
613 elif not isinstance(e, SystemExit) or e.code != -2:
614 self.logger.warning("Abnormal termination:\n{}".format(
615 traceback.format_exc()))
619 # Stop the thread before doing anything else
620 self._stop_checkpointer.set()
621 self._checkpointer.join()
622 if self._checkpoint_before_quit:
623 # Commit all pending blocks & one last _update()
624 self._local_collection.manifest_text()
625 self._update(final=True)
627 self.save_collection()
629 self._cache_file.close()
631 def _collection_trash_at(self):
633 Returns the trash date that the collection should use at save time.
634 Takes into account absolute/relative trash_at values requested
637 if type(self._trash_at) == datetime.timedelta:
638 # Get an absolute datetime for trash_at
639 return datetime.datetime.utcnow() + self._trash_at
640 return self._trash_at
642 def save_collection(self):
644 # Check if files should be updated on the remote collection.
645 for fp in self._file_paths:
646 remote_file = self._remote_collection.find(fp)
648 # File don't exist on remote collection, copy it.
649 self._remote_collection.copy(fp, fp, self._local_collection)
650 elif remote_file != self._local_collection.find(fp):
651 # A different file exist on remote collection, overwrite it.
652 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
654 # The file already exist on remote collection, skip it.
656 self._remote_collection.save(storage_classes=self.storage_classes,
657 num_retries=self.num_retries,
658 trash_at=self._collection_trash_at())
660 if self.storage_classes is None:
661 self.storage_classes = ['default']
662 self._local_collection.save_new(
663 name=self.name, owner_uuid=self.owner_uuid,
664 storage_classes=self.storage_classes,
665 ensure_unique_name=self.ensure_unique_name,
666 num_retries=self.num_retries,
667 trash_at=self._collection_trash_at())
669 def destroy_cache(self):
672 os.unlink(self._cache_filename)
673 except OSError as error:
674 # That's what we wanted anyway.
675 if error.errno != errno.ENOENT:
677 self._cache_file.close()
679 def _collection_size(self, collection):
681 Recursively get the total size of the collection
684 for item in listvalues(collection):
685 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
686 size += self._collection_size(item)
691 def _update_task(self):
693 Periodically called support task. File uploading is
694 asynchronous so we poll status from the collection.
696 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
699 def _update(self, final=False):
701 Update cached manifest text and report progress.
703 if self._upload_started:
704 with self._collection_lock:
705 self.bytes_written = self._collection_size(self._local_collection)
708 manifest = self._local_collection.manifest_text()
710 # Get the manifest text without comitting pending blocks
711 manifest = self._local_collection.manifest_text(strip=False,
715 with self._state_lock:
716 self._state['manifest'] = manifest
720 except Exception as e:
721 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
723 self.bytes_written = self.bytes_skipped
724 # Call the reporter, if any
725 self.report_progress()
727 def report_progress(self):
728 if self.reporter is not None:
729 self.reporter(self.bytes_written, self.bytes_expected)
731 def _write_stdin(self, filename):
732 output = self._local_collection.open(filename, 'wb')
733 self._write(sys.stdin, output)
736 def _check_file(self, source, filename):
738 Check if this file needs to be uploaded
740 # Ignore symlinks when requested
741 if (not self.follow_links) and os.path.islink(source):
744 should_upload = False
745 new_file_in_cache = False
746 # Record file path for updating the remote collection before exiting
747 self._file_paths.add(filename)
749 with self._state_lock:
750 # If no previous cached data on this file, store it for an eventual
752 if source not in self._state['files']:
753 self._state['files'][source] = {
754 'mtime': os.path.getmtime(source),
755 'size' : os.path.getsize(source)
757 new_file_in_cache = True
758 cached_file_data = self._state['files'][source]
760 # Check if file was already uploaded (at least partially)
761 file_in_local_collection = self._local_collection.find(filename)
763 # If not resuming, upload the full file.
766 # New file detected from last run, upload it.
767 elif new_file_in_cache:
769 # Local file didn't change from last run.
770 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
771 if not file_in_local_collection:
772 # File not uploaded yet, upload it completely
774 elif file_in_local_collection.permission_expired():
775 # Permission token expired, re-upload file. This will change whenever
776 # we have a API for refreshing tokens.
777 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
779 self._local_collection.remove(filename)
780 elif cached_file_data['size'] == file_in_local_collection.size():
781 # File already there, skip it.
782 self.bytes_skipped += cached_file_data['size']
783 elif cached_file_data['size'] > file_in_local_collection.size():
784 # File partially uploaded, resume!
785 resume_offset = file_in_local_collection.size()
786 self.bytes_skipped += resume_offset
789 # Inconsistent cache, re-upload the file
791 self._local_collection.remove(filename)
792 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
793 # Local file differs from cached data, re-upload it.
795 if file_in_local_collection:
796 self._local_collection.remove(filename)
801 self._files_to_upload.append((source, resume_offset, filename))
802 except ArvPutUploadIsPending:
803 # This could happen when running on dry-mode, close cache file to
804 # avoid locking issues.
805 self._cache_file.close()
808 def _upload_files(self):
809 for source, resume_offset, filename in self._files_to_upload:
810 with open(source, 'rb') as source_fd:
811 with self._state_lock:
812 self._state['files'][source]['mtime'] = os.path.getmtime(source)
813 self._state['files'][source]['size'] = os.path.getsize(source)
814 if resume_offset > 0:
815 # Start upload where we left off
816 output = self._local_collection.open(filename, 'ab')
817 source_fd.seek(resume_offset)
820 output = self._local_collection.open(filename, 'wb')
821 self._write(source_fd, output)
822 output.close(flush=False)
824 def _write(self, source_fd, output):
826 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
831 def _my_collection(self):
832 return self._remote_collection if self.update else self._local_collection
834 def _get_cache_filepath(self):
835 # Set up cache file name from input paths.
837 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
838 realpaths = sorted(os.path.realpath(path) for path in self.paths)
839 md5.update(b'\0'.join([p.encode() for p in realpaths]))
841 md5.update(self.filename.encode())
842 cache_filename = md5.hexdigest()
843 cache_filepath = os.path.join(
844 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
846 return cache_filepath
848 def _setup_state(self, update_collection):
850 Create a new cache file or load a previously existing one.
852 # Load an already existing collection for update
853 if update_collection and re.match(arvados.util.collection_uuid_pattern,
856 self._remote_collection = arvados.collection.Collection(
857 update_collection, api_client=self._api_client)
858 except arvados.errors.ApiError as error:
859 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
862 elif update_collection:
863 # Collection locator provided, but unknown format
864 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
867 cache_filepath = self._get_cache_filepath()
868 if self.resume and os.path.exists(cache_filepath):
869 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
870 self._cache_file = open(cache_filepath, 'a+')
872 # --no-resume means start with a empty cache file.
873 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
874 self._cache_file = open(cache_filepath, 'w+')
875 self._cache_filename = self._cache_file.name
876 self._lock_file(self._cache_file)
877 self._cache_file.seek(0)
879 with self._state_lock:
882 self._state = json.load(self._cache_file)
883 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
884 # Cache at least partially incomplete, set up new cache
885 self._state = copy.deepcopy(self.EMPTY_STATE)
887 # Cache file empty, set up new cache
888 self._state = copy.deepcopy(self.EMPTY_STATE)
890 self.logger.info("No cache usage requested for this run.")
891 # No cache file, set empty state
892 self._state = copy.deepcopy(self.EMPTY_STATE)
893 if not self._cached_manifest_valid():
894 raise ResumeCacheInvalidError()
895 # Load the previous manifest so we can check if files were modified remotely.
896 self._local_collection = arvados.collection.Collection(
897 self._state['manifest'],
898 replication_desired=self.replication_desired,
899 put_threads=self.put_threads,
900 api_client=self._api_client)
902 def _cached_manifest_valid(self):
904 Validate the oldest non-expired block signature to check if cached manifest
905 is usable: checking if the cached manifest was not created with a different
908 if self._state.get('manifest', None) is None:
909 # No cached manifest yet, all good.
911 now = datetime.datetime.utcnow()
915 for m in keep_locator_pattern.finditer(self._state['manifest']):
918 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
920 # Locator without signature
923 if exp > now and (oldest_exp is None or exp < oldest_exp):
927 # No block signatures found => no invalid block signatures.
929 if oldest_loc is None:
930 # Locator signatures found, but all have expired.
931 # Reset the cache and move on.
932 self.logger.info('Cache expired, starting from scratch.')
933 self._state['manifest'] = ''
935 kc = arvados.KeepClient(api_client=self._api_client,
936 num_retries=self.num_retries)
939 except arvados.errors.KeepRequestError:
940 # Something is wrong, cached manifest is not valid.
944 def collection_file_paths(self, col, path_prefix='.'):
945 """Return a list of file paths by recursively go through the entire collection `col`"""
947 for name, item in listitems(col):
948 if isinstance(item, arvados.arvfile.ArvadosFile):
949 file_paths.append(os.path.join(path_prefix, name))
950 elif isinstance(item, arvados.collection.Subcollection):
951 new_prefix = os.path.join(path_prefix, name)
952 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
955 def _lock_file(self, fileobj):
957 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
959 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
961 def _save_state(self):
963 Atomically save current state into cache.
965 with self._state_lock:
966 # We're not using copy.deepcopy() here because it's a lot slower
967 # than json.dumps(), and we're already needing JSON format to be
969 state = json.dumps(self._state)
971 new_cache = tempfile.NamedTemporaryFile(
973 dir=os.path.dirname(self._cache_filename), delete=False)
974 self._lock_file(new_cache)
975 new_cache.write(state)
978 os.rename(new_cache.name, self._cache_filename)
979 except (IOError, OSError, ResumeCacheConflict) as error:
980 self.logger.error("There was a problem while saving the cache file: {}".format(error))
982 os.unlink(new_cache_name)
983 except NameError: # mkstemp failed.
986 self._cache_file.close()
987 self._cache_file = new_cache
989 def collection_name(self):
990 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
992 def manifest_locator(self):
993 return self._my_collection().manifest_locator()
995 def portable_data_hash(self):
996 pdh = self._my_collection().portable_data_hash()
997 m = self._my_collection().stripped_manifest().encode()
998 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1000 self.logger.warning("\n".join([
1001 "arv-put: API server provided PDH differs from local manifest.",
1002 " This should not happen; showing API server version."]))
1005 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1006 return self._my_collection().manifest_text(stream_name, strip, normalize)
1008 def _datablocks_on_item(self, item):
1010 Return a list of datablock locators, recursively navigating
1011 through subcollections
1013 if isinstance(item, arvados.arvfile.ArvadosFile):
1014 if item.size() == 0:
1015 # Empty file locator
1016 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1019 for segment in item.segments():
1020 loc = segment.locator
1021 locators.append(loc)
1023 elif isinstance(item, arvados.collection.Collection):
1024 l = [self._datablocks_on_item(x) for x in listvalues(item)]
1025 # Fast list flattener method taken from:
1026 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1027 return [loc for sublist in l for loc in sublist]
1031 def data_locators(self):
1032 with self._collection_lock:
1033 # Make sure all datablocks are flushed before getting the locators
1034 self._my_collection().manifest_text()
1035 datablocks = self._datablocks_on_item(self._my_collection())
1038 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1041 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1042 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1043 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1044 # so instead we're using it on every path component.
1045 def pathname_match(pathname, pattern):
1046 name = pathname.split(os.sep)
1047 # Fix patterns like 'some/subdir/' or 'some//subdir'
1048 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1049 if len(name) != len(pat):
1051 for i in range(len(name)):
1052 if not fnmatch.fnmatch(name[i], pat[i]):
1056 def machine_progress(bytes_written, bytes_expected):
1057 return _machine_format.format(
1058 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1060 def human_progress(bytes_written, bytes_expected):
1062 return "\r{}M / {}M {:.1%} ".format(
1063 bytes_written >> 20, bytes_expected >> 20,
1064 float(bytes_written) / bytes_expected)
1066 return "\r{} ".format(bytes_written)
1068 def progress_writer(progress_func, outfile=sys.stderr):
1069 def write_progress(bytes_written, bytes_expected):
1070 outfile.write(progress_func(bytes_written, bytes_expected))
1071 return write_progress
1073 def desired_project_uuid(api_client, project_uuid, num_retries):
1074 if not project_uuid:
1075 query = api_client.users().current()
1076 elif arvados.util.user_uuid_pattern.match(project_uuid):
1077 query = api_client.users().get(uuid=project_uuid)
1078 elif arvados.util.group_uuid_pattern.match(project_uuid):
1079 query = api_client.groups().get(uuid=project_uuid)
1081 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1082 return query.execute(num_retries=num_retries)['uuid']
1084 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1085 install_sig_handlers=True):
1088 args = parse_arguments(arguments)
1089 logger = logging.getLogger('arvados.arv_put')
1091 logger.setLevel(logging.WARNING)
1093 logger.setLevel(logging.INFO)
1096 request_id = arvados.util.new_request_id()
1098 formatter = ArvPutLogFormatter(request_id)
1099 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1101 if api_client is None:
1102 api_client = arvados.api('v1', request_id=request_id)
1104 if install_sig_handlers:
1105 arv_cmd.install_signal_handlers()
1107 # Trash arguments validation
1109 if args.trash_at is not None:
1111 trash_at = ciso8601.parse_datetime(args.trash_at)
1113 logger.error("--trash-at argument format invalid, should be YYYY-MM-DDTHH:MM.")
1116 if trash_at.tzinfo is not None:
1117 # Timezone-aware datetime provided, convert to non-aware UTC
1118 delta = trash_at.tzinfo.utcoffset(None)
1119 trash_at = trash_at.replace(tzinfo=None) - delta
1120 if trash_at <= datetime.datetime.utcnow():
1121 logger.error("--trash-at argument should be set in the future")
1123 if args.trash_after is not None:
1124 if args.trash_after < 1:
1125 logger.error("--trash-after argument should be >= 1")
1127 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1129 # Determine the name to use
1131 if args.stream or args.raw:
1132 logger.error("Cannot use --name with --stream or --raw")
1134 elif args.update_collection:
1135 logger.error("Cannot use --name with --update-collection")
1137 collection_name = args.name
1139 collection_name = "Saved at {} by {}@{}".format(
1140 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1141 pwd.getpwuid(os.getuid()).pw_name,
1142 socket.gethostname())
1144 if args.project_uuid and (args.stream or args.raw):
1145 logger.error("Cannot use --project-uuid with --stream or --raw")
1148 # Determine the parent project
1150 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1152 except (apiclient_errors.Error, ValueError) as error:
1157 reporter = progress_writer(human_progress)
1158 elif args.batch_progress:
1159 reporter = progress_writer(machine_progress)
1163 # Split storage-classes argument
1164 storage_classes = None
1165 if args.storage_classes:
1166 storage_classes = args.storage_classes.strip().split(',')
1167 if len(storage_classes) > 1:
1168 logger.error("Multiple storage classes are not supported currently.")
1172 # Setup exclude regex from all the --exclude arguments provided
1175 exclude_names = None
1176 if len(args.exclude) > 0:
1177 # We're supporting 2 kinds of exclusion patterns:
1178 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1179 # the name, wherever the file is on the tree)
1180 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1181 # entire path, and should be relative to
1182 # any input dir argument)
1183 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1184 # placed directly underneath the input dir)
1185 for p in args.exclude:
1186 # Only relative paths patterns allowed
1187 if p.startswith(os.sep):
1188 logger.error("Cannot use absolute paths with --exclude")
1190 if os.path.dirname(p):
1191 # We don't support of path patterns with '..'
1192 p_parts = p.split(os.sep)
1195 "Cannot use path patterns that include or '..'")
1197 # Path search pattern
1198 exclude_paths.append(p)
1200 # Name-only search pattern
1201 name_patterns.append(p)
1202 # For name only matching, we can combine all patterns into a single
1203 # regexp, for better performance.
1204 exclude_names = re.compile('|'.join(
1205 [fnmatch.translate(p) for p in name_patterns]
1206 )) if len(name_patterns) > 0 else None
1207 # Show the user the patterns to be used, just in case they weren't
1208 # specified inside quotes and got changed by the shell expansion.
1209 logger.info("Exclude patterns: {}".format(args.exclude))
1211 # If this is used by a human, and there's at least one directory to be
1212 # uploaded, the expected bytes calculation can take a moment.
1213 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1214 logger.info("Calculating upload size, this could take some time...")
1216 writer = ArvPutUploadJob(paths = args.paths,
1217 resume = args.resume,
1218 use_cache = args.use_cache,
1219 filename = args.filename,
1220 reporter = reporter,
1221 api_client = api_client,
1222 num_retries = args.retries,
1223 replication_desired = args.replication,
1224 put_threads = args.threads,
1225 name = collection_name,
1226 owner_uuid = project_uuid,
1227 ensure_unique_name = True,
1228 update_collection = args.update_collection,
1229 storage_classes=storage_classes,
1231 dry_run=args.dry_run,
1232 follow_links=args.follow_links,
1233 exclude_paths=exclude_paths,
1234 exclude_names=exclude_names,
1236 except ResumeCacheConflict:
1237 logger.error("\n".join([
1238 "arv-put: Another process is already uploading this data.",
1239 " Use --no-cache if this is really what you want."]))
1241 except ResumeCacheInvalidError:
1242 logger.error("\n".join([
1243 "arv-put: Resume cache contains invalid signature: it may have expired",
1244 " or been created with another Arvados user's credentials.",
1245 " Switch user or use one of the following options to restart upload:",
1246 " --no-resume to start a new resume cache.",
1247 " --no-cache to disable resume cache."]))
1249 except (CollectionUpdateError, PathDoesNotExistError) as error:
1250 logger.error("\n".join([
1251 "arv-put: %s" % str(error)]))
1253 except ArvPutUploadIsPending:
1254 # Dry run check successful, return proper exit code.
1256 except ArvPutUploadNotPending:
1257 # No files pending for upload
1260 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1261 logger.warning("\n".join([
1262 "arv-put: Resuming previous upload from last checkpoint.",
1263 " Use the --no-resume option to start over."]))
1265 if not args.dry_run:
1266 writer.report_progress()
1269 writer.start(save_collection=not(args.stream or args.raw))
1270 except arvados.errors.ApiError as error:
1271 logger.error("\n".join([
1272 "arv-put: %s" % str(error)]))
1275 if args.progress: # Print newline to split stderr from stdout for humans.
1280 output = writer.manifest_text(normalize=True)
1282 output = writer.manifest_text()
1284 output = ','.join(writer.data_locators())
1287 if args.update_collection:
1288 logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
1290 logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
1291 if args.portable_data_hash:
1292 output = writer.portable_data_hash()
1294 output = writer.manifest_locator()
1295 except apiclient_errors.Error as error:
1297 "arv-put: Error creating Collection on project: {}.".format(
1301 # Print the locator (uuid) of the new collection.
1303 status = status or 1
1304 elif not args.silent:
1305 stdout.write(output)
1306 if not output.endswith('\n'):
1309 if install_sig_handlers:
1310 arv_cmd.restore_signal_handlers()
1319 if __name__ == '__main__':