1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 import arvados.collection
29 from apiclient import errors as apiclient_errors
30 from arvados._version import __version__
31 from arvados.util import keep_locator_pattern
33 import arvados.commands._util as arv_cmd
37 upload_opts = argparse.ArgumentParser(add_help=False)
39 upload_opts.add_argument('--version', action='version',
40 version="%s %s" % (sys.argv[0], __version__),
41 help='Print version and exit.')
42 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
44 Local file or directory. If path is a directory reference with a trailing
45 slash, then just upload the directory's contents; otherwise upload the
46 directory itself. Default: read from standard input.
49 _group = upload_opts.add_mutually_exclusive_group()
51 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
52 default=-1, help=argparse.SUPPRESS)
54 _group.add_argument('--normalize', action='store_true',
56 Normalize the manifest by re-ordering files and streams after writing
60 _group.add_argument('--dry-run', action='store_true', default=False,
62 Don't actually upload files, but only check if any file should be
63 uploaded. Exit with code=2 when files are pending for upload.
66 _group = upload_opts.add_mutually_exclusive_group()
68 _group.add_argument('--as-stream', action='store_true', dest='stream',
73 _group.add_argument('--stream', action='store_true',
75 Store the file content and display the resulting manifest on
76 stdout. Do not save a Collection object in Arvados.
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
81 Synonym for --manifest.
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
86 Synonym for --manifest.
89 _group.add_argument('--manifest', action='store_true',
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
101 _group.add_argument('--raw', action='store_true',
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109 dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115 dest='filename', help="""
116 Synonym for --filename.
119 upload_opts.add_argument('--filename', type=str, default=None,
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
140 upload_opts.add_argument('--storage-classes', help="""
141 Specify comma separated list of storage classes to be used when saving data to Keep.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
154 action='append', help="""
155 Exclude files and directories whose names match the given glob pattern. When
156 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
157 directory, relative to the provided input dirs will be excluded.
158 When using a filename pattern like '*.txt', any text file will be excluded
159 no matter where it is placed.
160 For the special case of needing to exclude only files or dirs directly below
161 the given input directory, you can use a pattern like './exclude_this.gif'.
162 You can specify multiple patterns by using this argument more than once.
165 _group = upload_opts.add_mutually_exclusive_group()
166 _group.add_argument('--follow-links', action='store_true', default=True,
167 dest='follow_links', help="""
168 Follow file and directory symlinks (default).
170 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
172 Ignore file and directory symlinks. Even paths given explicitly on the
173 command line will be skipped if they are symlinks.
177 run_opts = argparse.ArgumentParser(add_help=False)
179 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
180 Store the collection in the specified project, instead of your Home
184 run_opts.add_argument('--name', help="""
185 Save the collection with the specified name.
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--progress', action='store_true',
191 Display human-readable progress on stderr (bytes and, if possible,
192 percentage of total data size). This is the default behavior when
196 _group.add_argument('--no-progress', action='store_true',
198 Do not display human-readable progress on stderr, even if stderr is a
202 _group.add_argument('--batch-progress', action='store_true',
204 Display machine-readable progress on stderr (bytes and, if known,
208 run_opts.add_argument('--silent', action='store_true',
210 Do not print any debug messages to console. (Any error messages will
214 run_opts.add_argument('--batch', action='store_true', default=False,
216 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
220 _group = run_opts.add_mutually_exclusive_group()
221 _group.add_argument('--resume', action='store_true', default=True,
223 Continue interrupted uploads from cached state (default).
225 _group.add_argument('--no-resume', action='store_false', dest='resume',
227 Do not continue interrupted uploads from cached state.
230 _group = run_opts.add_mutually_exclusive_group()
231 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
233 Save upload state in a cache file for resuming (default).
235 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
237 Do not save upload state in a cache file for resuming.
240 _group = upload_opts.add_mutually_exclusive_group()
241 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
243 Set the trash date of the resulting collection to an absolute date in the future.
244 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
245 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
247 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
249 Set the trash date of the resulting collection to an amount of days from the
250 date/time that the upload process finishes.
253 arg_parser = argparse.ArgumentParser(
254 description='Copy data from the local filesystem to Keep.',
255 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
257 def parse_arguments(arguments):
258 args = arg_parser.parse_args(arguments)
260 if len(args.paths) == 0:
263 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
265 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
267 --filename argument cannot be used when storing a directory or
271 # Turn on --progress by default if stderr is a tty.
272 if (not (args.batch_progress or args.no_progress or args.silent)
273 and os.isatty(sys.stderr.fileno())):
276 # Turn off --resume (default) if --no-cache is used.
277 if not args.use_cache:
280 if args.paths == ['-']:
281 if args.update_collection:
283 --update-collection cannot be used when reading from stdin.
286 args.use_cache = False
287 if not args.filename:
288 args.filename = 'stdin'
290 # Remove possible duplicated patterns
291 if len(args.exclude) > 0:
292 args.exclude = list(set(args.exclude))
297 class PathDoesNotExistError(Exception):
301 class CollectionUpdateError(Exception):
305 class ResumeCacheConflict(Exception):
309 class ResumeCacheInvalidError(Exception):
312 class ArvPutArgumentConflict(Exception):
316 class ArvPutUploadIsPending(Exception):
320 class ArvPutUploadNotPending(Exception):
324 class FileUploadList(list):
325 def __init__(self, dry_run=False):
327 self.dry_run = dry_run
329 def append(self, other):
331 raise ArvPutUploadIsPending()
332 super(FileUploadList, self).append(other)
335 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
336 class ArvPutLogFormatter(logging.Formatter):
337 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
339 request_id_informed = False
341 def __init__(self, request_id):
342 self.err_fmtr = logging.Formatter(
343 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
344 arvados.log_date_format)
346 def format(self, record):
347 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
348 self.request_id_informed = True
349 return self.err_fmtr.format(record)
350 return self.std_fmtr.format(record)
353 class ResumeCache(object):
354 CACHE_DIR = '.cache/arvados/arv-put'
356 def __init__(self, file_spec):
357 self.cache_file = open(file_spec, 'a+')
358 self._lock_file(self.cache_file)
359 self.filename = self.cache_file.name
362 def make_path(cls, args):
364 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
365 realpaths = sorted(os.path.realpath(path) for path in args.paths)
366 md5.update(b'\0'.join([p.encode() for p in realpaths]))
367 if any(os.path.isdir(path) for path in realpaths):
370 md5.update(args.filename.encode())
372 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
375 def _lock_file(self, fileobj):
377 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
379 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
382 self.cache_file.seek(0)
383 return json.load(self.cache_file)
385 def check_cache(self, api_client=None, num_retries=0):
390 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
391 locator = state["_finished_streams"][0][1][0]
392 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
393 locator = state["_current_stream_locators"][0]
394 if locator is not None:
395 kc = arvados.keep.KeepClient(api_client=api_client)
396 kc.head(locator, num_retries=num_retries)
397 except Exception as e:
402 def save(self, data):
404 new_cache_fd, new_cache_name = tempfile.mkstemp(
405 dir=os.path.dirname(self.filename))
406 self._lock_file(new_cache_fd)
407 new_cache = os.fdopen(new_cache_fd, 'r+')
408 json.dump(data, new_cache)
409 os.rename(new_cache_name, self.filename)
410 except (IOError, OSError, ResumeCacheConflict):
412 os.unlink(new_cache_name)
413 except NameError: # mkstemp failed.
416 self.cache_file.close()
417 self.cache_file = new_cache
420 self.cache_file.close()
424 os.unlink(self.filename)
425 except OSError as error:
426 if error.errno != errno.ENOENT: # That's what we wanted anyway.
432 self.__init__(self.filename)
435 class ArvPutUploadJob(object):
436 CACHE_DIR = '.cache/arvados/arv-put'
438 'manifest' : None, # Last saved manifest checkpoint
439 'files' : {} # Previous run file list: {path : {size, mtime}}
442 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
443 name=None, owner_uuid=None, api_client=None, batch_mode=False,
444 ensure_unique_name=False, num_retries=None,
445 put_threads=None, replication_desired=None, filename=None,
446 update_time=60.0, update_collection=None, storage_classes=None,
447 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
448 follow_links=True, exclude_paths=[], exclude_names=None,
452 self.use_cache = use_cache
453 self.batch_mode = batch_mode
455 self.reporter = reporter
456 # This will set to 0 before start counting, if no special files are going
458 self.bytes_expected = None
459 self.bytes_written = 0
460 self.bytes_skipped = 0
462 self.owner_uuid = owner_uuid
463 self.ensure_unique_name = ensure_unique_name
464 self.num_retries = num_retries
465 self.replication_desired = replication_desired
466 self.put_threads = put_threads
467 self.filename = filename
468 self.storage_classes = storage_classes
469 self._api_client = api_client
470 self._state_lock = threading.Lock()
471 self._state = None # Previous run state (file list & manifest)
472 self._current_files = [] # Current run file list
473 self._cache_file = None
474 self._collection_lock = threading.Lock()
475 self._remote_collection = None # Collection being updated (if asked)
476 self._local_collection = None # Collection from previous run manifest
477 self._file_paths = set() # Files to be updated in remote collection
478 self._stop_checkpointer = threading.Event()
479 self._checkpointer = threading.Thread(target=self._update_task)
480 self._checkpointer.daemon = True
481 self._update_task_time = update_time # How many seconds wait between update runs
482 self._files_to_upload = FileUploadList(dry_run=dry_run)
483 self._upload_started = False
485 self.dry_run = dry_run
486 self._checkpoint_before_quit = True
487 self.follow_links = follow_links
488 self.exclude_paths = exclude_paths
489 self.exclude_names = exclude_names
490 self._trash_at = trash_at
492 if self._trash_at is not None:
493 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
494 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
495 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
496 raise TypeError('provided trash_at datetime should be timezone-naive')
498 if not self.use_cache and self.resume:
499 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
501 # Check for obvious dry-run responses
502 if self.dry_run and (not self.use_cache or not self.resume):
503 raise ArvPutUploadIsPending()
505 # Load cached data if any and if needed
506 self._setup_state(update_collection)
508 # Build the upload file list, excluding requested files and counting the
509 # bytes expected to be uploaded.
510 self._build_upload_list()
512 def _build_upload_list(self):
514 Scan the requested paths to count file sizes, excluding requested files
515 and dirs and building the upload file list.
517 # If there aren't special files to be read, reset total bytes count to zero
519 if not any([p for p in self.paths
520 if not (os.path.isfile(p) or os.path.isdir(p))]):
521 self.bytes_expected = 0
523 for path in self.paths:
524 # Test for stdin first, in case some file named '-' exist
527 raise ArvPutUploadIsPending()
528 self._write_stdin(self.filename or 'stdin')
529 elif not os.path.exists(path):
530 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
531 elif (not self.follow_links) and os.path.islink(path):
532 self.logger.warning("Skipping symlink '{}'".format(path))
534 elif os.path.isdir(path):
535 # Use absolute paths on cache index so CWD doesn't interfere
536 # with the caching logic.
538 path = os.path.abspath(path)
539 if orig_path[-1:] == os.sep:
540 # When passing a directory reference with a trailing slash,
541 # its contents should be uploaded directly to the
545 # When passing a directory reference with no trailing slash,
546 # upload the directory to the collection's root.
547 prefixdir = os.path.dirname(path)
549 for root, dirs, files in os.walk(path,
550 followlinks=self.follow_links):
551 root_relpath = os.path.relpath(root, path)
552 if root_relpath == '.':
554 # Exclude files/dirs by full path matching pattern
555 if self.exclude_paths:
556 dirs[:] = [d for d in dirs
557 if not any(pathname_match(
558 os.path.join(root_relpath, d), pat)
559 for pat in self.exclude_paths)]
560 files = [f for f in files
561 if not any(pathname_match(
562 os.path.join(root_relpath, f), pat)
563 for pat in self.exclude_paths)]
564 # Exclude files/dirs by name matching pattern
565 if self.exclude_names is not None:
566 dirs[:] = [d for d in dirs
567 if not self.exclude_names.match(d)]
568 files = [f for f in files
569 if not self.exclude_names.match(f)]
570 # Make os.walk()'s dir traversing order deterministic
574 filepath = os.path.join(root, f)
575 if not os.path.isfile(filepath):
576 self.logger.warning("Skipping non-regular file '{}'".format(filepath))
578 # Add its size to the total bytes count (if applicable)
579 if self.follow_links or (not os.path.islink(filepath)):
580 if self.bytes_expected is not None:
581 self.bytes_expected += os.path.getsize(filepath)
582 self._check_file(filepath,
583 os.path.join(root[len(prefixdir):], f))
585 filepath = os.path.abspath(path)
586 # Add its size to the total bytes count (if applicable)
587 if self.follow_links or (not os.path.islink(filepath)):
588 if self.bytes_expected is not None:
589 self.bytes_expected += os.path.getsize(filepath)
590 self._check_file(filepath,
591 self.filename or os.path.basename(path))
592 # If dry-mode is on, and got up to this point, then we should notify that
593 # there aren't any file to upload.
595 raise ArvPutUploadNotPending()
596 # Remove local_collection's files that don't exist locally anymore, so the
597 # bytes_written count is correct.
598 for f in self.collection_file_paths(self._local_collection,
600 if f != 'stdin' and f != self.filename and not f in self._file_paths:
601 self._local_collection.remove(f)
603 def start(self, save_collection):
605 Start supporting thread & file uploading
607 self._checkpointer.start()
609 # Update bytes_written from current local collection and
610 # report initial progress.
613 self._upload_started = True # Used by the update thread to start checkpointing
615 except (SystemExit, Exception) as e:
616 self._checkpoint_before_quit = False
617 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
618 # Note: We're expecting SystemExit instead of
619 # KeyboardInterrupt because we have a custom signal
620 # handler in place that raises SystemExit with the catched
622 if isinstance(e, PathDoesNotExistError):
623 # We aren't interested in the traceback for this case
625 elif not isinstance(e, SystemExit) or e.code != -2:
626 self.logger.warning("Abnormal termination:\n{}".format(
627 traceback.format_exc()))
631 # Stop the thread before doing anything else
632 self._stop_checkpointer.set()
633 self._checkpointer.join()
634 if self._checkpoint_before_quit:
635 # Commit all pending blocks & one last _update()
636 self._local_collection.manifest_text()
637 self._update(final=True)
639 self.save_collection()
641 self._cache_file.close()
643 def _collection_trash_at(self):
645 Returns the trash date that the collection should use at save time.
646 Takes into account absolute/relative trash_at values requested
649 if type(self._trash_at) == datetime.timedelta:
650 # Get an absolute datetime for trash_at
651 return datetime.datetime.utcnow() + self._trash_at
652 return self._trash_at
654 def save_collection(self):
656 # Check if files should be updated on the remote collection.
657 for fp in self._file_paths:
658 remote_file = self._remote_collection.find(fp)
660 # File don't exist on remote collection, copy it.
661 self._remote_collection.copy(fp, fp, self._local_collection)
662 elif remote_file != self._local_collection.find(fp):
663 # A different file exist on remote collection, overwrite it.
664 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
666 # The file already exist on remote collection, skip it.
668 self._remote_collection.save(num_retries=self.num_retries,
669 trash_at=self._collection_trash_at())
671 if len(self._local_collection) == 0:
672 self.logger.warning("No files were uploaded, skipping collection creation.")
674 self._local_collection.save_new(
675 name=self.name, owner_uuid=self.owner_uuid,
676 ensure_unique_name=self.ensure_unique_name,
677 num_retries=self.num_retries,
678 trash_at=self._collection_trash_at())
680 def destroy_cache(self):
683 os.unlink(self._cache_filename)
684 except OSError as error:
685 # That's what we wanted anyway.
686 if error.errno != errno.ENOENT:
688 self._cache_file.close()
690 def _collection_size(self, collection):
692 Recursively get the total size of the collection
695 for item in collection.values():
696 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
697 size += self._collection_size(item)
702 def _update_task(self):
704 Periodically called support task. File uploading is
705 asynchronous so we poll status from the collection.
707 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
710 def _update(self, final=False):
712 Update cached manifest text and report progress.
714 if self._upload_started:
715 with self._collection_lock:
716 self.bytes_written = self._collection_size(self._local_collection)
719 manifest = self._local_collection.manifest_text()
721 # Get the manifest text without comitting pending blocks
722 manifest = self._local_collection.manifest_text(strip=False,
726 with self._state_lock:
727 self._state['manifest'] = manifest
731 except Exception as e:
732 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
733 # Keep remote collection's trash_at attribute synced when using relative expire dates
734 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
736 self._api_client.collections().update(
737 uuid=self._remote_collection.manifest_locator(),
738 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
739 ).execute(num_retries=self.num_retries)
740 except Exception as e:
741 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
743 self.bytes_written = self.bytes_skipped
744 # Call the reporter, if any
745 self.report_progress()
747 def report_progress(self):
748 if self.reporter is not None:
749 self.reporter(self.bytes_written, self.bytes_expected)
751 def _write_stdin(self, filename):
752 output = self._local_collection.open(filename, 'wb')
753 self._write(sys.stdin.buffer, output)
756 def _check_file(self, source, filename):
758 Check if this file needs to be uploaded
760 # Ignore symlinks when requested
761 if (not self.follow_links) and os.path.islink(source):
764 should_upload = False
765 new_file_in_cache = False
766 # Record file path for updating the remote collection before exiting
767 self._file_paths.add(filename)
769 with self._state_lock:
770 # If no previous cached data on this file, store it for an eventual
772 if source not in self._state['files']:
773 self._state['files'][source] = {
774 'mtime': os.path.getmtime(source),
775 'size' : os.path.getsize(source)
777 new_file_in_cache = True
778 cached_file_data = self._state['files'][source]
780 # Check if file was already uploaded (at least partially)
781 file_in_local_collection = self._local_collection.find(filename)
783 # If not resuming, upload the full file.
786 # New file detected from last run, upload it.
787 elif new_file_in_cache:
789 # Local file didn't change from last run.
790 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
791 if not file_in_local_collection:
792 # File not uploaded yet, upload it completely
794 elif file_in_local_collection.permission_expired():
795 # Permission token expired, re-upload file. This will change whenever
796 # we have a API for refreshing tokens.
797 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
799 self._local_collection.remove(filename)
800 elif cached_file_data['size'] == file_in_local_collection.size():
801 # File already there, skip it.
802 self.bytes_skipped += cached_file_data['size']
803 elif cached_file_data['size'] > file_in_local_collection.size():
804 # File partially uploaded, resume!
805 resume_offset = file_in_local_collection.size()
806 self.bytes_skipped += resume_offset
809 # Inconsistent cache, re-upload the file
811 self._local_collection.remove(filename)
812 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
813 # Local file differs from cached data, re-upload it.
815 if file_in_local_collection:
816 self._local_collection.remove(filename)
821 self._files_to_upload.append((source, resume_offset, filename))
822 except ArvPutUploadIsPending:
823 # This could happen when running on dry-mode, close cache file to
824 # avoid locking issues.
825 self._cache_file.close()
828 def _upload_files(self):
829 for source, resume_offset, filename in self._files_to_upload:
830 with open(source, 'rb') as source_fd:
831 with self._state_lock:
832 self._state['files'][source]['mtime'] = os.path.getmtime(source)
833 self._state['files'][source]['size'] = os.path.getsize(source)
834 if resume_offset > 0:
835 # Start upload where we left off
836 output = self._local_collection.open(filename, 'ab')
837 source_fd.seek(resume_offset)
840 output = self._local_collection.open(filename, 'wb')
841 self._write(source_fd, output)
842 output.close(flush=False)
844 def _write(self, source_fd, output):
846 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
851 def _my_collection(self):
852 return self._remote_collection if self.update else self._local_collection
854 def _get_cache_filepath(self):
855 # Set up cache file name from input paths.
857 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
858 realpaths = sorted(os.path.realpath(path) for path in self.paths)
859 md5.update(b'\0'.join([p.encode() for p in realpaths]))
861 md5.update(self.filename.encode())
862 cache_filename = md5.hexdigest()
863 cache_filepath = os.path.join(
864 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
866 return cache_filepath
868 def _setup_state(self, update_collection):
870 Create a new cache file or load a previously existing one.
872 # Load an already existing collection for update
873 if update_collection and re.match(arvados.util.collection_uuid_pattern,
876 self._remote_collection = arvados.collection.Collection(
878 api_client=self._api_client,
879 storage_classes_desired=self.storage_classes,
880 num_retries=self.num_retries)
881 except arvados.errors.ApiError as error:
882 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
885 elif update_collection:
886 # Collection locator provided, but unknown format
887 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
890 cache_filepath = self._get_cache_filepath()
891 if self.resume and os.path.exists(cache_filepath):
892 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
893 self._cache_file = open(cache_filepath, 'a+')
895 # --no-resume means start with a empty cache file.
896 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
897 self._cache_file = open(cache_filepath, 'w+')
898 self._cache_filename = self._cache_file.name
899 self._lock_file(self._cache_file)
900 self._cache_file.seek(0)
902 with self._state_lock:
905 self._state = json.load(self._cache_file)
906 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
907 # Cache at least partially incomplete, set up new cache
908 self._state = copy.deepcopy(self.EMPTY_STATE)
910 # Cache file empty, set up new cache
911 self._state = copy.deepcopy(self.EMPTY_STATE)
913 self.logger.info("No cache usage requested for this run.")
914 # No cache file, set empty state
915 self._state = copy.deepcopy(self.EMPTY_STATE)
916 if not self._cached_manifest_valid():
917 if not self.batch_mode:
918 raise ResumeCacheInvalidError()
920 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
921 self.use_cache = False # Don't overwrite preexisting cache file.
922 self._state = copy.deepcopy(self.EMPTY_STATE)
923 # Load the previous manifest so we can check if files were modified remotely.
924 self._local_collection = arvados.collection.Collection(
925 self._state['manifest'],
926 replication_desired=self.replication_desired,
927 storage_classes_desired=self.storage_classes,
928 put_threads=self.put_threads,
929 api_client=self._api_client,
930 num_retries=self.num_retries)
932 def _cached_manifest_valid(self):
934 Validate the oldest non-expired block signature to check if cached manifest
935 is usable: checking if the cached manifest was not created with a different
938 if self._state.get('manifest', None) is None:
939 # No cached manifest yet, all good.
941 now = datetime.datetime.utcnow()
945 for m in keep_locator_pattern.finditer(self._state['manifest']):
948 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
950 # Locator without signature
953 if exp > now and (oldest_exp is None or exp < oldest_exp):
957 # No block signatures found => no invalid block signatures.
959 if oldest_loc is None:
960 # Locator signatures found, but all have expired.
961 # Reset the cache and move on.
962 self.logger.info('Cache expired, starting from scratch.')
963 self._state['manifest'] = ''
965 kc = arvados.KeepClient(api_client=self._api_client,
966 num_retries=self.num_retries)
969 except arvados.errors.KeepRequestError:
970 # Something is wrong, cached manifest is not valid.
974 def collection_file_paths(self, col, path_prefix='.'):
975 """Return a list of file paths by recursively go through the entire collection `col`"""
977 for name, item in col.items():
978 if isinstance(item, arvados.arvfile.ArvadosFile):
979 file_paths.append(os.path.join(path_prefix, name))
980 elif isinstance(item, arvados.collection.Subcollection):
981 new_prefix = os.path.join(path_prefix, name)
982 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
985 def _lock_file(self, fileobj):
987 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
989 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
991 def _save_state(self):
993 Atomically save current state into cache.
995 with self._state_lock:
996 # We're not using copy.deepcopy() here because it's a lot slower
997 # than json.dumps(), and we're already needing JSON format to be
999 state = json.dumps(self._state)
1001 new_cache = tempfile.NamedTemporaryFile(
1003 dir=os.path.dirname(self._cache_filename), delete=False)
1004 self._lock_file(new_cache)
1005 new_cache.write(state)
1008 os.rename(new_cache.name, self._cache_filename)
1009 except (IOError, OSError, ResumeCacheConflict) as error:
1010 self.logger.error("There was a problem while saving the cache file: {}".format(error))
1012 os.unlink(new_cache_name)
1013 except NameError: # mkstemp failed.
1016 self._cache_file.close()
1017 self._cache_file = new_cache
1019 def collection_name(self):
1020 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1022 def collection_trash_at(self):
1023 return self._my_collection().get_trash_at()
1025 def manifest_locator(self):
1026 return self._my_collection().manifest_locator()
1028 def portable_data_hash(self):
1029 pdh = self._my_collection().portable_data_hash()
1030 m = self._my_collection().stripped_manifest().encode()
1031 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1032 if pdh != local_pdh:
1033 self.logger.warning("\n".join([
1034 "arv-put: API server provided PDH differs from local manifest.",
1035 " This should not happen; showing API server version."]))
1038 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1039 return self._my_collection().manifest_text(stream_name, strip, normalize)
1041 def _datablocks_on_item(self, item):
1043 Return a list of datablock locators, recursively navigating
1044 through subcollections
1046 if isinstance(item, arvados.arvfile.ArvadosFile):
1047 if item.size() == 0:
1048 # Empty file locator
1049 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1052 for segment in item.segments():
1053 loc = segment.locator
1054 locators.append(loc)
1056 elif isinstance(item, arvados.collection.Collection):
1057 l = [self._datablocks_on_item(x) for x in item.values()]
1058 # Fast list flattener method taken from:
1059 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1060 return [loc for sublist in l for loc in sublist]
1064 def data_locators(self):
1065 with self._collection_lock:
1066 # Make sure all datablocks are flushed before getting the locators
1067 self._my_collection().manifest_text()
1068 datablocks = self._datablocks_on_item(self._my_collection())
1071 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1074 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1075 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1076 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1077 # so instead we're using it on every path component.
1078 def pathname_match(pathname, pattern):
1079 name = pathname.split(os.sep)
1080 # Fix patterns like 'some/subdir/' or 'some//subdir'
1081 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1082 if len(name) != len(pat):
1084 for i in range(len(name)):
1085 if not fnmatch.fnmatch(name[i], pat[i]):
1089 def machine_progress(bytes_written, bytes_expected):
1090 return _machine_format.format(
1091 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1093 def human_progress(bytes_written, bytes_expected):
1095 return "\r{}M / {}M {:.1%} ".format(
1096 bytes_written >> 20, bytes_expected >> 20,
1097 float(bytes_written) / bytes_expected)
1099 return "\r{} ".format(bytes_written)
1101 def progress_writer(progress_func, outfile=sys.stderr):
1102 def write_progress(bytes_written, bytes_expected):
1103 outfile.write(progress_func(bytes_written, bytes_expected))
1104 return write_progress
1106 def desired_project_uuid(api_client, project_uuid, num_retries):
1107 if not project_uuid:
1108 query = api_client.users().current()
1109 elif arvados.util.user_uuid_pattern.match(project_uuid):
1110 query = api_client.users().get(uuid=project_uuid)
1111 elif arvados.util.group_uuid_pattern.match(project_uuid):
1112 query = api_client.groups().get(uuid=project_uuid)
1114 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1115 return query.execute(num_retries=num_retries)['uuid']
1117 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1118 install_sig_handlers=True):
1121 args = parse_arguments(arguments)
1122 logger = logging.getLogger('arvados.arv_put')
1124 logger.setLevel(logging.WARNING)
1126 logger.setLevel(logging.INFO)
1129 request_id = arvados.util.new_request_id()
1131 formatter = ArvPutLogFormatter(request_id)
1132 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1134 if api_client is None:
1135 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1137 if install_sig_handlers:
1138 arv_cmd.install_signal_handlers()
1140 # Trash arguments validation
1142 if args.trash_at is not None:
1143 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1144 # make sure the user provides a complete YYYY-MM-DD date.
1145 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1146 logger.error("--trash-at argument format invalid, use --help to see examples.")
1148 # Check if no time information was provided. In that case, assume end-of-day.
1149 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1150 args.trash_at += 'T23:59:59'
1152 trash_at = ciso8601.parse_datetime(args.trash_at)
1154 logger.error("--trash-at argument format invalid, use --help to see examples.")
1157 if trash_at.tzinfo is not None:
1158 # Timezone aware datetime provided.
1159 utcoffset = -trash_at.utcoffset()
1161 # Timezone naive datetime provided. Assume is local.
1163 utcoffset = datetime.timedelta(seconds=time.altzone)
1165 utcoffset = datetime.timedelta(seconds=time.timezone)
1166 # Convert to UTC timezone naive datetime.
1167 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1169 if trash_at <= datetime.datetime.utcnow():
1170 logger.error("--trash-at argument must be set in the future")
1172 if args.trash_after is not None:
1173 if args.trash_after < 1:
1174 logger.error("--trash-after argument must be >= 1")
1176 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1178 # Determine the name to use
1180 if args.stream or args.raw:
1181 logger.error("Cannot use --name with --stream or --raw")
1183 elif args.update_collection:
1184 logger.error("Cannot use --name with --update-collection")
1186 collection_name = args.name
1188 collection_name = "Saved at {} by {}@{}".format(
1189 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1190 pwd.getpwuid(os.getuid()).pw_name,
1191 socket.gethostname())
1193 if args.project_uuid and (args.stream or args.raw):
1194 logger.error("Cannot use --project-uuid with --stream or --raw")
1197 # Determine the parent project
1199 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1201 except (apiclient_errors.Error, ValueError) as error:
1206 reporter = progress_writer(human_progress)
1207 elif args.batch_progress:
1208 reporter = progress_writer(machine_progress)
1212 # Split storage-classes argument
1213 storage_classes = None
1214 if args.storage_classes:
1215 storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1217 # Setup exclude regex from all the --exclude arguments provided
1220 exclude_names = None
1221 if len(args.exclude) > 0:
1222 # We're supporting 2 kinds of exclusion patterns:
1223 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1224 # the name, wherever the file is on the tree)
1225 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1226 # entire path, and should be relative to
1227 # any input dir argument)
1228 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1229 # placed directly underneath the input dir)
1230 for p in args.exclude:
1231 # Only relative paths patterns allowed
1232 if p.startswith(os.sep):
1233 logger.error("Cannot use absolute paths with --exclude")
1235 if os.path.dirname(p):
1236 # We don't support of path patterns with '..'
1237 p_parts = p.split(os.sep)
1240 "Cannot use path patterns that include or '..'")
1242 # Path search pattern
1243 exclude_paths.append(p)
1245 # Name-only search pattern
1246 name_patterns.append(p)
1247 # For name only matching, we can combine all patterns into a single
1248 # regexp, for better performance.
1249 exclude_names = re.compile('|'.join(
1250 [fnmatch.translate(p) for p in name_patterns]
1251 )) if len(name_patterns) > 0 else None
1252 # Show the user the patterns to be used, just in case they weren't
1253 # specified inside quotes and got changed by the shell expansion.
1254 logger.info("Exclude patterns: {}".format(args.exclude))
1256 # If this is used by a human, and there's at least one directory to be
1257 # uploaded, the expected bytes calculation can take a moment.
1258 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1259 logger.info("Calculating upload size, this could take some time...")
1261 writer = ArvPutUploadJob(paths = args.paths,
1262 resume = args.resume,
1263 use_cache = args.use_cache,
1264 batch_mode= args.batch,
1265 filename = args.filename,
1266 reporter = reporter,
1267 api_client = api_client,
1268 num_retries = args.retries,
1269 replication_desired = args.replication,
1270 put_threads = args.threads,
1271 name = collection_name,
1272 owner_uuid = project_uuid,
1273 ensure_unique_name = True,
1274 update_collection = args.update_collection,
1275 storage_classes=storage_classes,
1277 dry_run=args.dry_run,
1278 follow_links=args.follow_links,
1279 exclude_paths=exclude_paths,
1280 exclude_names=exclude_names,
1282 except ResumeCacheConflict:
1283 logger.error("\n".join([
1284 "arv-put: Another process is already uploading this data.",
1285 " Use --no-cache if this is really what you want."]))
1287 except ResumeCacheInvalidError:
1288 logger.error("\n".join([
1289 "arv-put: Resume cache contains invalid signature: it may have expired",
1290 " or been created with another Arvados user's credentials.",
1291 " Switch user or use one of the following options to restart upload:",
1292 " --no-resume to start a new resume cache.",
1293 " --no-cache to disable resume cache.",
1294 " --batch to ignore the resume cache if invalid."]))
1296 except (CollectionUpdateError, PathDoesNotExistError) as error:
1297 logger.error("\n".join([
1298 "arv-put: %s" % str(error)]))
1300 except ArvPutUploadIsPending:
1301 # Dry run check successful, return proper exit code.
1303 except ArvPutUploadNotPending:
1304 # No files pending for upload
1307 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1308 logger.warning("\n".join([
1309 "arv-put: Resuming previous upload from last checkpoint.",
1310 " Use the --no-resume option to start over."]))
1312 if not args.dry_run:
1313 writer.report_progress()
1316 writer.start(save_collection=not(args.stream or args.raw))
1317 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1318 logger.error("\n".join([
1319 "arv-put: %s" % str(error)]))
1322 if args.progress: # Print newline to split stderr from stdout for humans.
1327 output = writer.manifest_text(normalize=True)
1329 output = writer.manifest_text()
1331 output = ','.join(writer.data_locators())
1332 elif writer.manifest_locator() is not None:
1334 expiration_notice = ""
1335 if writer.collection_trash_at() is not None:
1336 # Get the local timezone-naive version, and log it with timezone information.
1338 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1340 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1341 expiration_notice = ". It will expire on {} {}.".format(
1342 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1343 if args.update_collection:
1344 logger.info(u"Collection updated: '{}'{}".format(
1345 writer.collection_name(), expiration_notice))
1347 logger.info(u"Collection saved as '{}'{}".format(
1348 writer.collection_name(), expiration_notice))
1349 if args.portable_data_hash:
1350 output = writer.portable_data_hash()
1352 output = writer.manifest_locator()
1353 except apiclient_errors.Error as error:
1355 "arv-put: Error creating Collection on project: {}.".format(
1361 # Print the locator (uuid) of the new collection.
1363 status = status or 1
1364 elif not args.silent:
1365 stdout.write(output)
1366 if not output.endswith('\n'):
1369 if install_sig_handlers:
1370 arv_cmd.restore_signal_handlers()
1379 if __name__ == '__main__':