1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 import arvados.collection
29 from pathlib import Path
31 from apiclient import errors as apiclient_errors
32 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
39 upload_opts = argparse.ArgumentParser(add_help=False)
41 upload_opts.add_argument('--version', action='version',
42 version="%s %s" % (sys.argv[0], __version__),
43 help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
51 _group = upload_opts.add_mutually_exclusive_group()
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54 default=-1, help=argparse.SUPPRESS)
56 _group.add_argument('--normalize', action='store_true',
58 Normalize the manifest by re-ordering files and streams after writing
62 _group.add_argument('--dry-run', action='store_true', default=False,
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
68 _group = upload_opts.add_mutually_exclusive_group()
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
75 _group.add_argument('--stream', action='store_true',
77 Store the file content and display the resulting manifest on
78 stdout. Do not save a Collection object in Arvados.
81 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83 Synonym for --manifest.
86 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88 Synonym for --manifest.
91 _group.add_argument('--manifest', action='store_true',
93 Store the file data and resulting manifest in Keep, save a Collection
94 object in Arvados, and display the manifest locator (Collection uuid)
95 on stdout. This is the default behavior.
98 _group.add_argument('--as-raw', action='store_true', dest='raw',
103 _group.add_argument('--raw', action='store_true',
105 Store the file content and display the data block locators on stdout,
106 separated by commas, with a trailing newline. Do not store a
110 upload_opts.add_argument('--update-collection', type=str, default=None,
111 dest='update_collection', metavar="UUID", help="""
112 Update an existing collection identified by the given Arvados collection
113 UUID. All new local files will be uploaded.
116 upload_opts.add_argument('--use-filename', type=str, default=None,
117 dest='filename', help="""
118 Synonym for --filename.
121 upload_opts.add_argument('--filename', type=str, default=None,
123 Use the given filename in the manifest, instead of the name of the
124 local file. This is useful when "-" or "/dev/stdin" is given as an
125 input file. It can be used only if there is exactly one path given and
126 it is not a directory. Implies --manifest.
129 upload_opts.add_argument('--portable-data-hash', action='store_true',
131 Print the portable data hash instead of the Arvados UUID for the collection
132 created by the upload.
135 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137 Set the replication level for the new collection: how many different
138 physical storage devices (e.g., disks) should have a copy of each data
139 block. Default is to use the server-provided default (if any) or 2.
142 upload_opts.add_argument('--storage-classes', help="""
143 Specify comma separated list of storage classes to be used when saving data to Keep.
146 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
148 Set the number of upload threads to be used. Take into account that
149 using lots of threads will increase the RAM requirements. Default is
151 On high latency installations, using a greater number will improve
155 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
156 action='append', help="""
157 Exclude files and directories whose names match the given glob pattern. When
158 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
159 directory, relative to the provided input dirs will be excluded.
160 When using a filename pattern like '*.txt', any text file will be excluded
161 no matter where it is placed.
162 For the special case of needing to exclude only files or dirs directly below
163 the given input directory, you can use a pattern like './exclude_this.gif'.
164 You can specify multiple patterns by using this argument more than once.
167 _group = upload_opts.add_mutually_exclusive_group()
168 _group.add_argument('--follow-links', action='store_true', default=True,
169 dest='follow_links', help="""
170 Follow file and directory symlinks (default).
172 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
174 Ignore file and directory symlinks. Even paths given explicitly on the
175 command line will be skipped if they are symlinks.
179 run_opts = argparse.ArgumentParser(add_help=False)
181 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
182 Store the collection in the specified project, instead of your Home
186 run_opts.add_argument('--name', help="""
187 Save the collection with the specified name.
190 _group = run_opts.add_mutually_exclusive_group()
191 _group.add_argument('--progress', action='store_true',
193 Display human-readable progress on stderr (bytes and, if possible,
194 percentage of total data size). This is the default behavior when
198 _group.add_argument('--no-progress', action='store_true',
200 Do not display human-readable progress on stderr, even if stderr is a
204 _group.add_argument('--batch-progress', action='store_true',
206 Display machine-readable progress on stderr (bytes and, if known,
210 run_opts.add_argument('--silent', action='store_true',
212 Do not print any debug messages to console. (Any error messages will
216 run_opts.add_argument('--batch', action='store_true', default=False,
218 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--resume', action='store_true', default=True,
225 Continue interrupted uploads from cached state (default).
227 _group.add_argument('--no-resume', action='store_false', dest='resume',
229 Do not continue interrupted uploads from cached state.
232 _group = run_opts.add_mutually_exclusive_group()
233 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
235 Save upload state in a cache file for resuming (default).
237 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
239 Do not save upload state in a cache file for resuming.
242 _group = upload_opts.add_mutually_exclusive_group()
243 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
245 Set the trash date of the resulting collection to an absolute date in the future.
246 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
247 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
249 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
251 Set the trash date of the resulting collection to an amount of days from the
252 date/time that the upload process finishes.
255 arg_parser = argparse.ArgumentParser(
256 description='Copy data from the local filesystem to Keep.',
257 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
259 def parse_arguments(arguments):
260 args = arg_parser.parse_args(arguments)
262 if len(args.paths) == 0:
265 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
267 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
269 --filename argument cannot be used when storing a directory or
273 # Turn on --progress by default if stderr is a tty.
274 if (not (args.batch_progress or args.no_progress or args.silent)
275 and os.isatty(sys.stderr.fileno())):
278 # Turn off --resume (default) if --no-cache is used.
279 if not args.use_cache:
282 if args.paths == ['-']:
283 if args.update_collection:
285 --update-collection cannot be used when reading from stdin.
288 args.use_cache = False
289 if not args.filename:
290 args.filename = 'stdin'
292 # Remove possible duplicated patterns
293 if len(args.exclude) > 0:
294 args.exclude = list(set(args.exclude))
299 class PathDoesNotExistError(Exception):
303 class CollectionUpdateError(Exception):
307 class ResumeCacheConflict(Exception):
311 class ResumeCacheInvalidError(Exception):
314 class ArvPutArgumentConflict(Exception):
318 class ArvPutUploadIsPending(Exception):
322 class ArvPutUploadNotPending(Exception):
326 class FileUploadList(list):
327 def __init__(self, dry_run=False):
329 self.dry_run = dry_run
331 def append(self, other):
333 raise ArvPutUploadIsPending()
334 super(FileUploadList, self).append(other)
337 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
338 class ArvPutLogFormatter(logging.Formatter):
339 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
341 request_id_informed = False
343 def __init__(self, request_id):
344 self.err_fmtr = logging.Formatter(
345 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
346 arvados.log_date_format)
348 def format(self, record):
349 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
350 self.request_id_informed = True
351 return self.err_fmtr.format(record)
352 return self.std_fmtr.format(record)
355 class ResumeCache(object):
356 CACHE_DIR = 'arv-put'
358 def __init__(self, file_spec):
359 self.cache_file = open(file_spec, 'a+')
360 self._lock_file(self.cache_file)
361 self.filename = self.cache_file.name
364 def make_path(cls, args):
366 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
367 realpaths = sorted(os.path.realpath(path) for path in args.paths)
368 md5.update(b'\0'.join([p.encode() for p in realpaths]))
369 if any(os.path.isdir(path) for path in realpaths):
372 md5.update(args.filename.encode())
373 cache_path = Path(cls.CACHE_DIR)
374 if len(cache_path.parts) == 1:
375 cache_path = arvados.util._BaseDirectories('CACHE').storage_path(cache_path)
377 # Note this is a noop if cache_path is absolute, which is what we want.
378 cache_path = Path.home() / cache_path
379 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
380 return str(cache_path / md5.hexdigest())
382 def _lock_file(self, fileobj):
384 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
386 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
389 self.cache_file.seek(0)
390 return json.load(self.cache_file)
392 def check_cache(self, api_client=None, num_retries=0):
397 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
398 locator = state["_finished_streams"][0][1][0]
399 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
400 locator = state["_current_stream_locators"][0]
401 if locator is not None:
402 kc = arvados.keep.KeepClient(api_client=api_client)
403 kc.head(locator, num_retries=num_retries)
404 except Exception as e:
409 def save(self, data):
411 new_cache_fd, new_cache_name = tempfile.mkstemp(
412 dir=os.path.dirname(self.filename))
413 self._lock_file(new_cache_fd)
414 new_cache = os.fdopen(new_cache_fd, 'r+')
415 json.dump(data, new_cache)
416 os.rename(new_cache_name, self.filename)
417 except (IOError, OSError, ResumeCacheConflict):
419 os.unlink(new_cache_name)
420 except NameError: # mkstemp failed.
423 self.cache_file.close()
424 self.cache_file = new_cache
427 self.cache_file.close()
431 os.unlink(self.filename)
432 except OSError as error:
433 if error.errno != errno.ENOENT: # That's what we wanted anyway.
439 self.__init__(self.filename)
442 class ArvPutUploadJob(object):
443 CACHE_DIR = 'arv-put'
445 'manifest' : None, # Last saved manifest checkpoint
446 'files' : {} # Previous run file list: {path : {size, mtime}}
449 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
450 name=None, owner_uuid=None, api_client=None, batch_mode=False,
451 ensure_unique_name=False, num_retries=None,
452 put_threads=None, replication_desired=None, filename=None,
453 update_time=60.0, update_collection=None, storage_classes=None,
454 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
455 follow_links=True, exclude_paths=[], exclude_names=None,
459 self.use_cache = use_cache
460 self.batch_mode = batch_mode
462 self.reporter = reporter
463 # This will set to 0 before start counting, if no special files are going
465 self.bytes_expected = None
466 self.bytes_written = 0
467 self.bytes_skipped = 0
469 self.owner_uuid = owner_uuid
470 self.ensure_unique_name = ensure_unique_name
471 self.num_retries = num_retries
472 self.replication_desired = replication_desired
473 self.put_threads = put_threads
474 self.filename = filename
475 self.storage_classes = storage_classes
476 self._api_client = api_client
477 self._state_lock = threading.Lock()
478 self._state = None # Previous run state (file list & manifest)
479 self._current_files = [] # Current run file list
480 self._cache_file = None
481 self._collection_lock = threading.Lock()
482 self._remote_collection = None # Collection being updated (if asked)
483 self._local_collection = None # Collection from previous run manifest
484 self._file_paths = set() # Files to be updated in remote collection
485 self._stop_checkpointer = threading.Event()
486 self._checkpointer = threading.Thread(target=self._update_task)
487 self._checkpointer.daemon = True
488 self._update_task_time = update_time # How many seconds wait between update runs
489 self._files_to_upload = FileUploadList(dry_run=dry_run)
490 self._upload_started = False
492 self.dry_run = dry_run
493 self._checkpoint_before_quit = True
494 self.follow_links = follow_links
495 self.exclude_paths = exclude_paths
496 self.exclude_names = exclude_names
497 self._trash_at = trash_at
499 if self._trash_at is not None:
500 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
501 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
502 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
503 raise TypeError('provided trash_at datetime should be timezone-naive')
505 if not self.use_cache and self.resume:
506 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
508 # Check for obvious dry-run responses
509 if self.dry_run and (not self.use_cache or not self.resume):
510 raise ArvPutUploadIsPending()
512 # Load cached data if any and if needed
513 self._setup_state(update_collection)
515 # Build the upload file list, excluding requested files and counting the
516 # bytes expected to be uploaded.
517 self._build_upload_list()
519 def _build_upload_list(self):
521 Scan the requested paths to count file sizes, excluding requested files
522 and dirs and building the upload file list.
524 # If there aren't special files to be read, reset total bytes count to zero
526 if not any([p for p in self.paths
527 if not (os.path.isfile(p) or os.path.isdir(p))]):
528 self.bytes_expected = 0
530 for path in self.paths:
531 # Test for stdin first, in case some file named '-' exist
534 raise ArvPutUploadIsPending()
535 self._write_stdin(self.filename or 'stdin')
536 elif not os.path.exists(path):
537 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
538 elif (not self.follow_links) and os.path.islink(path):
539 self.logger.warning("Skipping symlink '{}'".format(path))
541 elif os.path.isdir(path):
542 # Use absolute paths on cache index so CWD doesn't interfere
543 # with the caching logic.
545 path = os.path.abspath(path)
546 if orig_path[-1:] == os.sep:
547 # When passing a directory reference with a trailing slash,
548 # its contents should be uploaded directly to the
552 # When passing a directory reference with no trailing slash,
553 # upload the directory to the collection's root.
554 prefixdir = os.path.dirname(path)
556 for root, dirs, files in os.walk(path,
557 followlinks=self.follow_links):
558 root_relpath = os.path.relpath(root, path)
559 if root_relpath == '.':
561 # Exclude files/dirs by full path matching pattern
562 if self.exclude_paths:
563 dirs[:] = [d for d in dirs
564 if not any(pathname_match(
565 os.path.join(root_relpath, d), pat)
566 for pat in self.exclude_paths)]
567 files = [f for f in files
568 if not any(pathname_match(
569 os.path.join(root_relpath, f), pat)
570 for pat in self.exclude_paths)]
571 # Exclude files/dirs by name matching pattern
572 if self.exclude_names is not None:
573 dirs[:] = [d for d in dirs
574 if not self.exclude_names.match(d)]
575 files = [f for f in files
576 if not self.exclude_names.match(f)]
577 # Make os.walk()'s dir traversing order deterministic
581 filepath = os.path.join(root, f)
582 if not os.path.isfile(filepath):
583 self.logger.warning("Skipping non-regular file '{}'".format(filepath))
585 # Add its size to the total bytes count (if applicable)
586 if self.follow_links or (not os.path.islink(filepath)):
587 if self.bytes_expected is not None:
588 self.bytes_expected += os.path.getsize(filepath)
589 self._check_file(filepath,
590 os.path.join(root[len(prefixdir):], f))
592 filepath = os.path.abspath(path)
593 # Add its size to the total bytes count (if applicable)
594 if self.follow_links or (not os.path.islink(filepath)):
595 if self.bytes_expected is not None:
596 self.bytes_expected += os.path.getsize(filepath)
597 self._check_file(filepath,
598 self.filename or os.path.basename(path))
599 # If dry-mode is on, and got up to this point, then we should notify that
600 # there aren't any file to upload.
602 raise ArvPutUploadNotPending()
603 # Remove local_collection's files that don't exist locally anymore, so the
604 # bytes_written count is correct.
605 for f in self.collection_file_paths(self._local_collection,
607 if f != 'stdin' and f != self.filename and not f in self._file_paths:
608 self._local_collection.remove(f)
610 def start(self, save_collection):
612 Start supporting thread & file uploading
614 self._checkpointer.start()
616 # Update bytes_written from current local collection and
617 # report initial progress.
620 self._upload_started = True # Used by the update thread to start checkpointing
622 except (SystemExit, Exception) as e:
623 self._checkpoint_before_quit = False
624 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
625 # Note: We're expecting SystemExit instead of
626 # KeyboardInterrupt because we have a custom signal
627 # handler in place that raises SystemExit with the catched
629 if isinstance(e, PathDoesNotExistError):
630 # We aren't interested in the traceback for this case
632 elif not isinstance(e, SystemExit) or e.code != -2:
633 self.logger.warning("Abnormal termination:\n{}".format(
634 traceback.format_exc()))
638 # Stop the thread before doing anything else
639 self._stop_checkpointer.set()
640 self._checkpointer.join()
641 if self._checkpoint_before_quit:
642 # Commit all pending blocks & one last _update()
643 self._local_collection.manifest_text()
644 self._update(final=True)
646 self.save_collection()
648 self._cache_file.close()
650 def _collection_trash_at(self):
652 Returns the trash date that the collection should use at save time.
653 Takes into account absolute/relative trash_at values requested
656 if type(self._trash_at) == datetime.timedelta:
657 # Get an absolute datetime for trash_at
658 return datetime.datetime.utcnow() + self._trash_at
659 return self._trash_at
661 def save_collection(self):
663 # Check if files should be updated on the remote collection.
664 for fp in self._file_paths:
665 remote_file = self._remote_collection.find(fp)
667 # File don't exist on remote collection, copy it.
668 self._remote_collection.copy(fp, fp, self._local_collection)
669 elif remote_file != self._local_collection.find(fp):
670 # A different file exist on remote collection, overwrite it.
671 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
673 # The file already exist on remote collection, skip it.
675 self._remote_collection.save(num_retries=self.num_retries,
676 trash_at=self._collection_trash_at())
678 if len(self._local_collection) == 0:
679 self.logger.warning("No files were uploaded, skipping collection creation.")
681 self._local_collection.save_new(
682 name=self.name, owner_uuid=self.owner_uuid,
683 ensure_unique_name=self.ensure_unique_name,
684 num_retries=self.num_retries,
685 trash_at=self._collection_trash_at())
687 def destroy_cache(self):
690 os.unlink(self._cache_filename)
691 except OSError as error:
692 # That's what we wanted anyway.
693 if error.errno != errno.ENOENT:
695 self._cache_file.close()
697 def _collection_size(self, collection):
699 Recursively get the total size of the collection
702 for item in collection.values():
703 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
704 size += self._collection_size(item)
709 def _update_task(self):
711 Periodically called support task. File uploading is
712 asynchronous so we poll status from the collection.
714 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
717 def _update(self, final=False):
719 Update cached manifest text and report progress.
721 if self._upload_started:
722 with self._collection_lock:
723 self.bytes_written = self._collection_size(self._local_collection)
726 manifest = self._local_collection.manifest_text()
728 # Get the manifest text without comitting pending blocks
729 manifest = self._local_collection.manifest_text(strip=False,
733 with self._state_lock:
734 self._state['manifest'] = manifest
738 except Exception as e:
739 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
740 # Keep remote collection's trash_at attribute synced when using relative expire dates
741 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
743 self._api_client.collections().update(
744 uuid=self._remote_collection.manifest_locator(),
745 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
746 ).execute(num_retries=self.num_retries)
747 except Exception as e:
748 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
750 self.bytes_written = self.bytes_skipped
751 # Call the reporter, if any
752 self.report_progress()
754 def report_progress(self):
755 if self.reporter is not None:
756 self.reporter(self.bytes_written, self.bytes_expected)
758 def _write_stdin(self, filename):
759 output = self._local_collection.open(filename, 'wb')
760 self._write(sys.stdin.buffer, output)
763 def _check_file(self, source, filename):
765 Check if this file needs to be uploaded
767 # Ignore symlinks when requested
768 if (not self.follow_links) and os.path.islink(source):
771 should_upload = False
772 new_file_in_cache = False
773 # Record file path for updating the remote collection before exiting
774 self._file_paths.add(filename)
776 with self._state_lock:
777 # If no previous cached data on this file, store it for an eventual
779 if source not in self._state['files']:
780 self._state['files'][source] = {
781 'mtime': os.path.getmtime(source),
782 'size' : os.path.getsize(source)
784 new_file_in_cache = True
785 cached_file_data = self._state['files'][source]
787 # Check if file was already uploaded (at least partially)
788 file_in_local_collection = self._local_collection.find(filename)
790 # If not resuming, upload the full file.
793 # New file detected from last run, upload it.
794 elif new_file_in_cache:
796 # Local file didn't change from last run.
797 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
798 if not file_in_local_collection:
799 # File not uploaded yet, upload it completely
801 elif file_in_local_collection.permission_expired():
802 # Permission token expired, re-upload file. This will change whenever
803 # we have a API for refreshing tokens.
804 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
806 self._local_collection.remove(filename)
807 elif cached_file_data['size'] == file_in_local_collection.size():
808 # File already there, skip it.
809 self.bytes_skipped += cached_file_data['size']
810 elif cached_file_data['size'] > file_in_local_collection.size():
811 # File partially uploaded, resume!
812 resume_offset = file_in_local_collection.size()
813 self.bytes_skipped += resume_offset
816 # Inconsistent cache, re-upload the file
818 self._local_collection.remove(filename)
819 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
820 # Local file differs from cached data, re-upload it.
822 if file_in_local_collection:
823 self._local_collection.remove(filename)
828 self._files_to_upload.append((source, resume_offset, filename))
829 except ArvPutUploadIsPending:
830 # This could happen when running on dry-mode, close cache file to
831 # avoid locking issues.
832 self._cache_file.close()
835 def _upload_files(self):
836 for source, resume_offset, filename in self._files_to_upload:
837 with open(source, 'rb') as source_fd:
838 with self._state_lock:
839 self._state['files'][source]['mtime'] = os.path.getmtime(source)
840 self._state['files'][source]['size'] = os.path.getsize(source)
841 if resume_offset > 0:
842 # Start upload where we left off
843 output = self._local_collection.open(filename, 'ab')
844 source_fd.seek(resume_offset)
847 output = self._local_collection.open(filename, 'wb')
848 self._write(source_fd, output)
849 output.close(flush=False)
851 def _write(self, source_fd, output):
853 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
858 def _my_collection(self):
859 return self._remote_collection if self.update else self._local_collection
861 def _get_cache_filepath(self):
862 # Set up cache file name from input paths.
864 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
865 realpaths = sorted(os.path.realpath(path) for path in self.paths)
866 md5.update(b'\0'.join([p.encode() for p in realpaths]))
868 md5.update(self.filename.encode())
869 cache_path = Path(self.CACHE_DIR)
870 if len(cache_path.parts) == 1:
871 cache_path = arvados.util._BaseDirectories('CACHE').storage_path(cache_path)
873 # Note this is a noop if cache_path is absolute, which is what we want.
874 cache_path = Path.home() / cache_path
875 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
876 return str(cache_path / md5.hexdigest())
878 def _setup_state(self, update_collection):
880 Create a new cache file or load a previously existing one.
882 # Load an already existing collection for update
883 if update_collection and re.match(arvados.util.collection_uuid_pattern,
886 self._remote_collection = arvados.collection.Collection(
888 api_client=self._api_client,
889 storage_classes_desired=self.storage_classes,
890 num_retries=self.num_retries)
891 except arvados.errors.ApiError as error:
892 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
895 elif update_collection:
896 # Collection locator provided, but unknown format
897 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
900 cache_filepath = self._get_cache_filepath()
901 if self.resume and os.path.exists(cache_filepath):
902 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
903 self._cache_file = open(cache_filepath, 'a+')
905 # --no-resume means start with a empty cache file.
906 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
907 self._cache_file = open(cache_filepath, 'w+')
908 self._cache_filename = self._cache_file.name
909 self._lock_file(self._cache_file)
910 self._cache_file.seek(0)
912 with self._state_lock:
915 self._state = json.load(self._cache_file)
916 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
917 # Cache at least partially incomplete, set up new cache
918 self._state = copy.deepcopy(self.EMPTY_STATE)
920 # Cache file empty, set up new cache
921 self._state = copy.deepcopy(self.EMPTY_STATE)
923 self.logger.info("No cache usage requested for this run.")
924 # No cache file, set empty state
925 self._state = copy.deepcopy(self.EMPTY_STATE)
926 if not self._cached_manifest_valid():
927 if not self.batch_mode:
928 raise ResumeCacheInvalidError()
930 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
931 self.use_cache = False # Don't overwrite preexisting cache file.
932 self._state = copy.deepcopy(self.EMPTY_STATE)
933 # Load the previous manifest so we can check if files were modified remotely.
934 self._local_collection = arvados.collection.Collection(
935 self._state['manifest'],
936 replication_desired=self.replication_desired,
937 storage_classes_desired=self.storage_classes,
938 put_threads=self.put_threads,
939 api_client=self._api_client,
940 num_retries=self.num_retries)
942 def _cached_manifest_valid(self):
944 Validate the oldest non-expired block signature to check if cached manifest
945 is usable: checking if the cached manifest was not created with a different
948 if self._state.get('manifest', None) is None:
949 # No cached manifest yet, all good.
951 now = datetime.datetime.utcnow()
955 for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
958 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
960 # Locator without signature
963 if exp > now and (oldest_exp is None or exp < oldest_exp):
967 # No block signatures found => no invalid block signatures.
969 if oldest_loc is None:
970 # Locator signatures found, but all have expired.
971 # Reset the cache and move on.
972 self.logger.info('Cache expired, starting from scratch.')
973 self._state['manifest'] = ''
975 kc = arvados.KeepClient(api_client=self._api_client,
976 num_retries=self.num_retries)
979 except arvados.errors.KeepRequestError:
980 # Something is wrong, cached manifest is not valid.
984 def collection_file_paths(self, col, path_prefix='.'):
985 """Return a list of file paths by recursively go through the entire collection `col`"""
987 for name, item in col.items():
988 if isinstance(item, arvados.arvfile.ArvadosFile):
989 file_paths.append(os.path.join(path_prefix, name))
990 elif isinstance(item, arvados.collection.Subcollection):
991 new_prefix = os.path.join(path_prefix, name)
992 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
995 def _lock_file(self, fileobj):
997 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
999 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1001 def _save_state(self):
1003 Atomically save current state into cache.
1005 with self._state_lock:
1006 # We're not using copy.deepcopy() here because it's a lot slower
1007 # than json.dumps(), and we're already needing JSON format to be
1009 state = json.dumps(self._state)
1011 new_cache = tempfile.NamedTemporaryFile(
1013 dir=os.path.dirname(self._cache_filename), delete=False)
1014 self._lock_file(new_cache)
1015 new_cache.write(state)
1018 os.rename(new_cache.name, self._cache_filename)
1019 except (IOError, OSError, ResumeCacheConflict) as error:
1020 self.logger.error("There was a problem while saving the cache file: {}".format(error))
1022 os.unlink(new_cache_name)
1023 except NameError: # mkstemp failed.
1026 self._cache_file.close()
1027 self._cache_file = new_cache
1029 def collection_name(self):
1030 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1032 def collection_trash_at(self):
1033 return self._my_collection().get_trash_at()
1035 def manifest_locator(self):
1036 return self._my_collection().manifest_locator()
1038 def portable_data_hash(self):
1039 pdh = self._my_collection().portable_data_hash()
1040 m = self._my_collection().stripped_manifest().encode()
1041 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1042 if pdh != local_pdh:
1043 self.logger.warning("\n".join([
1044 "arv-put: API server provided PDH differs from local manifest.",
1045 " This should not happen; showing API server version."]))
1048 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1049 return self._my_collection().manifest_text(stream_name, strip, normalize)
1051 def _datablocks_on_item(self, item):
1053 Return a list of datablock locators, recursively navigating
1054 through subcollections
1056 if isinstance(item, arvados.arvfile.ArvadosFile):
1057 if item.size() == 0:
1058 # Empty file locator
1059 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1062 for segment in item.segments():
1063 loc = segment.locator
1064 locators.append(loc)
1066 elif isinstance(item, arvados.collection.Collection):
1067 l = [self._datablocks_on_item(x) for x in item.values()]
1068 # Fast list flattener method taken from:
1069 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1070 return [loc for sublist in l for loc in sublist]
1074 def data_locators(self):
1075 with self._collection_lock:
1076 # Make sure all datablocks are flushed before getting the locators
1077 self._my_collection().manifest_text()
1078 datablocks = self._datablocks_on_item(self._my_collection())
1081 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1084 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1085 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1086 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1087 # so instead we're using it on every path component.
1088 def pathname_match(pathname, pattern):
1089 name = pathname.split(os.sep)
1090 # Fix patterns like 'some/subdir/' or 'some//subdir'
1091 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1092 if len(name) != len(pat):
1094 for i in range(len(name)):
1095 if not fnmatch.fnmatch(name[i], pat[i]):
1099 def machine_progress(bytes_written, bytes_expected):
1100 return _machine_format.format(
1101 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1103 def human_progress(bytes_written, bytes_expected):
1105 return "\r{}M / {}M {:.1%} ".format(
1106 bytes_written >> 20, bytes_expected >> 20,
1107 float(bytes_written) / bytes_expected)
1109 return "\r{} ".format(bytes_written)
1111 def progress_writer(progress_func, outfile=sys.stderr):
1112 def write_progress(bytes_written, bytes_expected):
1113 outfile.write(progress_func(bytes_written, bytes_expected))
1114 return write_progress
1116 def desired_project_uuid(api_client, project_uuid, num_retries):
1117 if not project_uuid:
1118 query = api_client.users().current()
1119 elif arvados.util.user_uuid_pattern.match(project_uuid):
1120 query = api_client.users().get(uuid=project_uuid)
1121 elif arvados.util.group_uuid_pattern.match(project_uuid):
1122 query = api_client.groups().get(uuid=project_uuid)
1124 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1125 return query.execute(num_retries=num_retries)['uuid']
1127 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1128 install_sig_handlers=True):
1131 args = parse_arguments(arguments)
1132 logger = logging.getLogger('arvados.arv_put')
1134 logger.setLevel(logging.WARNING)
1136 logger.setLevel(logging.INFO)
1139 request_id = arvados.util.new_request_id()
1141 formatter = ArvPutLogFormatter(request_id)
1142 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1144 if api_client is None:
1145 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1147 if install_sig_handlers:
1148 arv_cmd.install_signal_handlers()
1150 # Trash arguments validation
1152 if args.trash_at is not None:
1153 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1154 # make sure the user provides a complete YYYY-MM-DD date.
1155 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1156 logger.error("--trash-at argument format invalid, use --help to see examples.")
1158 # Check if no time information was provided. In that case, assume end-of-day.
1159 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1160 args.trash_at += 'T23:59:59'
1162 trash_at = ciso8601.parse_datetime(args.trash_at)
1164 logger.error("--trash-at argument format invalid, use --help to see examples.")
1167 if trash_at.tzinfo is not None:
1168 # Timezone aware datetime provided.
1169 utcoffset = -trash_at.utcoffset()
1171 # Timezone naive datetime provided. Assume is local.
1173 utcoffset = datetime.timedelta(seconds=time.altzone)
1175 utcoffset = datetime.timedelta(seconds=time.timezone)
1176 # Convert to UTC timezone naive datetime.
1177 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1179 if trash_at <= datetime.datetime.utcnow():
1180 logger.error("--trash-at argument must be set in the future")
1182 if args.trash_after is not None:
1183 if args.trash_after < 1:
1184 logger.error("--trash-after argument must be >= 1")
1186 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1188 # Determine the name to use
1190 if args.stream or args.raw:
1191 logger.error("Cannot use --name with --stream or --raw")
1193 elif args.update_collection:
1194 logger.error("Cannot use --name with --update-collection")
1196 collection_name = args.name
1198 collection_name = "Saved at {} by {}@{}".format(
1199 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1200 pwd.getpwuid(os.getuid()).pw_name,
1201 socket.gethostname())
1203 if args.project_uuid and (args.stream or args.raw):
1204 logger.error("Cannot use --project-uuid with --stream or --raw")
1207 # Determine the parent project
1209 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1211 except (apiclient_errors.Error, ValueError) as error:
1216 reporter = progress_writer(human_progress)
1217 elif args.batch_progress:
1218 reporter = progress_writer(machine_progress)
1222 # Split storage-classes argument
1223 storage_classes = None
1224 if args.storage_classes:
1225 storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1227 # Setup exclude regex from all the --exclude arguments provided
1230 exclude_names = None
1231 if len(args.exclude) > 0:
1232 # We're supporting 2 kinds of exclusion patterns:
1233 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1234 # the name, wherever the file is on the tree)
1235 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1236 # entire path, and should be relative to
1237 # any input dir argument)
1238 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1239 # placed directly underneath the input dir)
1240 for p in args.exclude:
1241 # Only relative paths patterns allowed
1242 if p.startswith(os.sep):
1243 logger.error("Cannot use absolute paths with --exclude")
1245 if os.path.dirname(p):
1246 # We don't support of path patterns with '..'
1247 p_parts = p.split(os.sep)
1250 "Cannot use path patterns that include or '..'")
1252 # Path search pattern
1253 exclude_paths.append(p)
1255 # Name-only search pattern
1256 name_patterns.append(p)
1257 # For name only matching, we can combine all patterns into a single
1258 # regexp, for better performance.
1259 exclude_names = re.compile('|'.join(
1260 [fnmatch.translate(p) for p in name_patterns]
1261 )) if len(name_patterns) > 0 else None
1262 # Show the user the patterns to be used, just in case they weren't
1263 # specified inside quotes and got changed by the shell expansion.
1264 logger.info("Exclude patterns: {}".format(args.exclude))
1266 # If this is used by a human, and there's at least one directory to be
1267 # uploaded, the expected bytes calculation can take a moment.
1268 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1269 logger.info("Calculating upload size, this could take some time...")
1271 writer = ArvPutUploadJob(paths = args.paths,
1272 resume = args.resume,
1273 use_cache = args.use_cache,
1274 batch_mode= args.batch,
1275 filename = args.filename,
1276 reporter = reporter,
1277 api_client = api_client,
1278 num_retries = args.retries,
1279 replication_desired = args.replication,
1280 put_threads = args.threads,
1281 name = collection_name,
1282 owner_uuid = project_uuid,
1283 ensure_unique_name = True,
1284 update_collection = args.update_collection,
1285 storage_classes=storage_classes,
1287 dry_run=args.dry_run,
1288 follow_links=args.follow_links,
1289 exclude_paths=exclude_paths,
1290 exclude_names=exclude_names,
1292 except ResumeCacheConflict:
1293 logger.error("\n".join([
1294 "arv-put: Another process is already uploading this data.",
1295 " Use --no-cache if this is really what you want."]))
1297 except ResumeCacheInvalidError:
1298 logger.error("\n".join([
1299 "arv-put: Resume cache contains invalid signature: it may have expired",
1300 " or been created with another Arvados user's credentials.",
1301 " Switch user or use one of the following options to restart upload:",
1302 " --no-resume to start a new resume cache.",
1303 " --no-cache to disable resume cache.",
1304 " --batch to ignore the resume cache if invalid."]))
1306 except (CollectionUpdateError, PathDoesNotExistError) as error:
1307 logger.error("\n".join([
1308 "arv-put: %s" % str(error)]))
1310 except ArvPutUploadIsPending:
1311 # Dry run check successful, return proper exit code.
1313 except ArvPutUploadNotPending:
1314 # No files pending for upload
1317 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1318 logger.warning("\n".join([
1319 "arv-put: Resuming previous upload from last checkpoint.",
1320 " Use the --no-resume option to start over."]))
1322 if not args.dry_run:
1323 writer.report_progress()
1326 writer.start(save_collection=not(args.stream or args.raw))
1327 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1328 logger.error("\n".join([
1329 "arv-put: %s" % str(error)]))
1332 if args.progress: # Print newline to split stderr from stdout for humans.
1337 output = writer.manifest_text(normalize=True)
1339 output = writer.manifest_text()
1341 output = ','.join(writer.data_locators())
1342 elif writer.manifest_locator() is not None:
1344 expiration_notice = ""
1345 if writer.collection_trash_at() is not None:
1346 # Get the local timezone-naive version, and log it with timezone information.
1348 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1350 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1351 expiration_notice = ". It will expire on {} {}.".format(
1352 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1353 if args.update_collection:
1354 logger.info(u"Collection updated: '{}'{}".format(
1355 writer.collection_name(), expiration_notice))
1357 logger.info(u"Collection saved as '{}'{}".format(
1358 writer.collection_name(), expiration_notice))
1359 if args.portable_data_hash:
1360 output = writer.portable_data_hash()
1362 output = writer.manifest_locator()
1363 except apiclient_errors.Error as error:
1365 "arv-put: Error creating Collection on project: {}.".format(
1371 # Print the locator (uuid) of the new collection.
1373 status = status or 1
1374 elif not args.silent:
1375 stdout.write(output)
1376 if not output.endswith('\n'):
1379 if install_sig_handlers:
1380 arv_cmd.restore_signal_handlers()
1389 if __name__ == '__main__':