1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 import arvados.collection
29 from pathlib import Path
32 import arvados.commands._util as arv_cmd
34 from apiclient import errors as apiclient_errors
35 from arvados._internal import basedirs
36 from arvados._version import __version__
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not save a Collection object in Arvados.
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89 Synonym for --manifest.
92 _group.add_argument('--manifest', action='store_true',
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
104 _group.add_argument('--raw', action='store_true',
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112 dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118 dest='filename', help="""
119 Synonym for --filename.
122 upload_opts.add_argument('--filename', type=str, default=None,
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
143 upload_opts.add_argument('--storage-classes', help="""
144 Specify comma separated list of storage classes to be used when saving data to Keep.
147 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149 Set the number of upload threads to be used. Take into account that
150 using lots of threads will increase the RAM requirements. Default is
152 On high latency installations, using a greater number will improve
156 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
157 action='append', help="""
158 Exclude files and directories whose names match the given glob pattern. When
159 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
160 directory, relative to the provided input dirs will be excluded.
161 When using a filename pattern like '*.txt', any text file will be excluded
162 no matter where it is placed.
163 For the special case of needing to exclude only files or dirs directly below
164 the given input directory, you can use a pattern like './exclude_this.gif'.
165 You can specify multiple patterns by using this argument more than once.
168 _group = upload_opts.add_mutually_exclusive_group()
169 _group.add_argument('--follow-links', action='store_true', default=True,
170 dest='follow_links', help="""
171 Follow file and directory symlinks (default).
173 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175 Ignore file and directory symlinks. Even paths given explicitly on the
176 command line will be skipped if they are symlinks.
180 run_opts = argparse.ArgumentParser(add_help=False)
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
199 _group.add_argument('--no-progress', action='store_true',
201 Do not display human-readable progress on stderr, even if stderr is a
205 _group.add_argument('--batch-progress', action='store_true',
207 Display machine-readable progress on stderr (bytes and, if known,
211 run_opts.add_argument('--silent', action='store_true',
213 Do not print any debug messages to console. (Any error messages will
217 run_opts.add_argument('--batch', action='store_true', default=False,
219 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
223 _group = run_opts.add_mutually_exclusive_group()
224 _group.add_argument('--resume', action='store_true', default=True,
226 Continue interrupted uploads from cached state (default).
228 _group.add_argument('--no-resume', action='store_false', dest='resume',
230 Do not continue interrupted uploads from cached state.
233 _group = run_opts.add_mutually_exclusive_group()
234 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
236 Save upload state in a cache file for resuming (default).
238 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
240 Do not save upload state in a cache file for resuming.
243 _group = upload_opts.add_mutually_exclusive_group()
244 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
246 Set the trash date of the resulting collection to an absolute date in the future.
247 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
248 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
250 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
252 Set the trash date of the resulting collection to an amount of days from the
253 date/time that the upload process finishes.
256 arg_parser = argparse.ArgumentParser(
257 description='Copy data from the local filesystem to Keep.',
258 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
260 def parse_arguments(arguments):
261 args = arg_parser.parse_args(arguments)
263 if len(args.paths) == 0:
266 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
268 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
270 --filename argument cannot be used when storing a directory or
274 # Turn on --progress by default if stderr is a tty.
275 if (not (args.batch_progress or args.no_progress or args.silent)
276 and os.isatty(sys.stderr.fileno())):
279 # Turn off --resume (default) if --no-cache is used.
280 if not args.use_cache:
283 if args.paths == ['-']:
284 if args.update_collection:
286 --update-collection cannot be used when reading from stdin.
289 args.use_cache = False
290 if not args.filename:
291 args.filename = 'stdin'
293 # Remove possible duplicated patterns
294 if len(args.exclude) > 0:
295 args.exclude = list(set(args.exclude))
300 class PathDoesNotExistError(Exception):
304 class CollectionUpdateError(Exception):
308 class ResumeCacheConflict(Exception):
312 class ResumeCacheInvalidError(Exception):
315 class ArvPutArgumentConflict(Exception):
319 class ArvPutUploadIsPending(Exception):
323 class ArvPutUploadNotPending(Exception):
327 class FileUploadList(list):
328 def __init__(self, dry_run=False):
330 self.dry_run = dry_run
332 def append(self, other):
334 raise ArvPutUploadIsPending()
335 super(FileUploadList, self).append(other)
338 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
339 class ArvPutLogFormatter(logging.Formatter):
340 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
342 request_id_informed = False
344 def __init__(self, request_id):
345 self.err_fmtr = logging.Formatter(
346 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
347 arvados.log_date_format)
349 def format(self, record):
350 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
351 self.request_id_informed = True
352 return self.err_fmtr.format(record)
353 return self.std_fmtr.format(record)
356 class ResumeCache(object):
357 CACHE_DIR = 'arv-put'
359 def __init__(self, file_spec):
360 self.cache_file = open(file_spec, 'a+')
361 self._lock_file(self.cache_file)
362 self.filename = self.cache_file.name
365 def make_path(cls, args):
367 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
368 realpaths = sorted(os.path.realpath(path) for path in args.paths)
369 md5.update(b'\0'.join([p.encode() for p in realpaths]))
370 if any(os.path.isdir(path) for path in realpaths):
373 md5.update(args.filename.encode())
374 cache_path = Path(cls.CACHE_DIR)
375 if len(cache_path.parts) == 1:
376 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
378 # Note this is a noop if cache_path is absolute, which is what we want.
379 cache_path = Path.home() / cache_path
380 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
381 return str(cache_path / md5.hexdigest())
383 def _lock_file(self, fileobj):
385 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
387 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
390 self.cache_file.seek(0)
391 return json.load(self.cache_file)
393 def check_cache(self, api_client=None, num_retries=0):
398 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
399 locator = state["_finished_streams"][0][1][0]
400 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
401 locator = state["_current_stream_locators"][0]
402 if locator is not None:
403 kc = arvados.keep.KeepClient(api_client=api_client)
404 kc.head(locator, num_retries=num_retries)
405 except Exception as e:
410 def save(self, data):
412 new_cache_fd, new_cache_name = tempfile.mkstemp(
413 dir=os.path.dirname(self.filename))
414 self._lock_file(new_cache_fd)
415 new_cache = os.fdopen(new_cache_fd, 'r+')
416 json.dump(data, new_cache)
417 os.rename(new_cache_name, self.filename)
418 except (IOError, OSError, ResumeCacheConflict):
420 os.unlink(new_cache_name)
421 except NameError: # mkstemp failed.
424 self.cache_file.close()
425 self.cache_file = new_cache
428 self.cache_file.close()
432 os.unlink(self.filename)
433 except OSError as error:
434 if error.errno != errno.ENOENT: # That's what we wanted anyway.
440 self.__init__(self.filename)
443 class ArvPutUploadJob(object):
444 CACHE_DIR = 'arv-put'
446 'manifest' : None, # Last saved manifest checkpoint
447 'files' : {} # Previous run file list: {path : {size, mtime}}
450 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
451 name=None, owner_uuid=None, api_client=None, batch_mode=False,
452 ensure_unique_name=False, num_retries=None,
453 put_threads=None, replication_desired=None, filename=None,
454 update_time=60.0, update_collection=None, storage_classes=None,
455 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
456 follow_links=True, exclude_paths=[], exclude_names=None,
460 self.use_cache = use_cache
461 self.batch_mode = batch_mode
463 self.reporter = reporter
464 # This will set to 0 before start counting, if no special files are going
466 self.bytes_expected = None
467 self.bytes_written = 0
468 self.bytes_skipped = 0
470 self.owner_uuid = owner_uuid
471 self.ensure_unique_name = ensure_unique_name
472 self.num_retries = num_retries
473 self.replication_desired = replication_desired
474 self.put_threads = put_threads
475 self.filename = filename
476 self.storage_classes = storage_classes
477 self._api_client = api_client
478 self._state_lock = threading.Lock()
479 self._state = None # Previous run state (file list & manifest)
480 self._current_files = [] # Current run file list
481 self._cache_file = None
482 self._collection_lock = threading.Lock()
483 self._remote_collection = None # Collection being updated (if asked)
484 self._local_collection = None # Collection from previous run manifest
485 self._file_paths = set() # Files to be updated in remote collection
486 self._stop_checkpointer = threading.Event()
487 self._checkpointer = threading.Thread(target=self._update_task)
488 self._checkpointer.daemon = True
489 self._update_task_time = update_time # How many seconds wait between update runs
490 self._files_to_upload = FileUploadList(dry_run=dry_run)
491 self._upload_started = False
493 self.dry_run = dry_run
494 self._checkpoint_before_quit = True
495 self.follow_links = follow_links
496 self.exclude_paths = exclude_paths
497 self.exclude_names = exclude_names
498 self._trash_at = trash_at
500 if self._trash_at is not None:
501 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
502 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
503 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
504 raise TypeError('provided trash_at datetime should be timezone-naive')
506 if not self.use_cache and self.resume:
507 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
509 # Check for obvious dry-run responses
510 if self.dry_run and (not self.use_cache or not self.resume):
511 raise ArvPutUploadIsPending()
513 # Load cached data if any and if needed
514 self._setup_state(update_collection)
516 # Build the upload file list, excluding requested files and counting the
517 # bytes expected to be uploaded.
518 self._build_upload_list()
520 def _build_upload_list(self):
522 Scan the requested paths to count file sizes, excluding requested files
523 and dirs and building the upload file list.
525 # If there aren't special files to be read, reset total bytes count to zero
527 if not any([p for p in self.paths
528 if not (os.path.isfile(p) or os.path.isdir(p))]):
529 self.bytes_expected = 0
531 for path in self.paths:
532 # Test for stdin first, in case some file named '-' exist
535 raise ArvPutUploadIsPending()
536 self._write_stdin(self.filename or 'stdin')
537 elif not os.path.exists(path):
538 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
539 elif (not self.follow_links) and os.path.islink(path):
540 self.logger.warning("Skipping symlink '{}'".format(path))
542 elif os.path.isdir(path):
543 # Use absolute paths on cache index so CWD doesn't interfere
544 # with the caching logic.
546 path = os.path.abspath(path)
547 if orig_path[-1:] == os.sep:
548 # When passing a directory reference with a trailing slash,
549 # its contents should be uploaded directly to the
553 # When passing a directory reference with no trailing slash,
554 # upload the directory to the collection's root.
555 prefixdir = os.path.dirname(path)
557 for root, dirs, files in os.walk(path,
558 followlinks=self.follow_links):
559 root_relpath = os.path.relpath(root, path)
560 if root_relpath == '.':
562 # Exclude files/dirs by full path matching pattern
563 if self.exclude_paths:
564 dirs[:] = [d for d in dirs
565 if not any(pathname_match(
566 os.path.join(root_relpath, d), pat)
567 for pat in self.exclude_paths)]
568 files = [f for f in files
569 if not any(pathname_match(
570 os.path.join(root_relpath, f), pat)
571 for pat in self.exclude_paths)]
572 # Exclude files/dirs by name matching pattern
573 if self.exclude_names is not None:
574 dirs[:] = [d for d in dirs
575 if not self.exclude_names.match(d)]
576 files = [f for f in files
577 if not self.exclude_names.match(f)]
578 # Make os.walk()'s dir traversing order deterministic
582 filepath = os.path.join(root, f)
583 if not os.path.isfile(filepath):
584 self.logger.warning("Skipping non-regular file '{}'".format(filepath))
586 # Add its size to the total bytes count (if applicable)
587 if self.follow_links or (not os.path.islink(filepath)):
588 if self.bytes_expected is not None:
589 self.bytes_expected += os.path.getsize(filepath)
590 self._check_file(filepath,
591 os.path.join(root[len(prefixdir):], f))
593 filepath = os.path.abspath(path)
594 # Add its size to the total bytes count (if applicable)
595 if self.follow_links or (not os.path.islink(filepath)):
596 if self.bytes_expected is not None:
597 self.bytes_expected += os.path.getsize(filepath)
598 self._check_file(filepath,
599 self.filename or os.path.basename(path))
600 # If dry-mode is on, and got up to this point, then we should notify that
601 # there aren't any file to upload.
603 raise ArvPutUploadNotPending()
604 # Remove local_collection's files that don't exist locally anymore, so the
605 # bytes_written count is correct.
606 for f in self.collection_file_paths(self._local_collection,
608 if f != 'stdin' and f != self.filename and not f in self._file_paths:
609 self._local_collection.remove(f)
611 def start(self, save_collection):
613 Start supporting thread & file uploading
615 self._checkpointer.start()
617 # Update bytes_written from current local collection and
618 # report initial progress.
621 self._upload_started = True # Used by the update thread to start checkpointing
623 except (SystemExit, Exception) as e:
624 self._checkpoint_before_quit = False
625 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
626 # Note: We're expecting SystemExit instead of
627 # KeyboardInterrupt because we have a custom signal
628 # handler in place that raises SystemExit with the catched
630 if isinstance(e, PathDoesNotExistError):
631 # We aren't interested in the traceback for this case
633 elif not isinstance(e, SystemExit) or e.code != -2:
634 self.logger.warning("Abnormal termination:\n{}".format(
635 traceback.format_exc()))
639 # Stop the thread before doing anything else
640 self._stop_checkpointer.set()
641 self._checkpointer.join()
642 if self._checkpoint_before_quit:
643 # Commit all pending blocks & one last _update()
644 self._local_collection.manifest_text()
645 self._update(final=True)
647 self.save_collection()
649 self._cache_file.close()
651 def _collection_trash_at(self):
653 Returns the trash date that the collection should use at save time.
654 Takes into account absolute/relative trash_at values requested
657 if type(self._trash_at) == datetime.timedelta:
658 # Get an absolute datetime for trash_at
659 return datetime.datetime.utcnow() + self._trash_at
660 return self._trash_at
662 def save_collection(self):
664 # Check if files should be updated on the remote collection.
665 for fp in self._file_paths:
666 remote_file = self._remote_collection.find(fp)
668 # File don't exist on remote collection, copy it.
669 self._remote_collection.copy(fp, fp, self._local_collection)
670 elif remote_file != self._local_collection.find(fp):
671 # A different file exist on remote collection, overwrite it.
672 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
674 # The file already exist on remote collection, skip it.
676 self._remote_collection.save(num_retries=self.num_retries,
677 trash_at=self._collection_trash_at())
679 if len(self._local_collection) == 0:
680 self.logger.warning("No files were uploaded, skipping collection creation.")
682 self._local_collection.save_new(
683 name=self.name, owner_uuid=self.owner_uuid,
684 ensure_unique_name=self.ensure_unique_name,
685 num_retries=self.num_retries,
686 trash_at=self._collection_trash_at())
688 def destroy_cache(self):
691 os.unlink(self._cache_filename)
692 except OSError as error:
693 # That's what we wanted anyway.
694 if error.errno != errno.ENOENT:
696 self._cache_file.close()
698 def _collection_size(self, collection):
700 Recursively get the total size of the collection
703 for item in collection.values():
704 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
705 size += self._collection_size(item)
710 def _update_task(self):
712 Periodically called support task. File uploading is
713 asynchronous so we poll status from the collection.
715 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
718 def _update(self, final=False):
720 Update cached manifest text and report progress.
722 if self._upload_started:
723 with self._collection_lock:
724 self.bytes_written = self._collection_size(self._local_collection)
727 manifest = self._local_collection.manifest_text()
729 # Get the manifest text without comitting pending blocks
730 manifest = self._local_collection.manifest_text(strip=False,
734 with self._state_lock:
735 self._state['manifest'] = manifest
739 except Exception as e:
740 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
741 # Keep remote collection's trash_at attribute synced when using relative expire dates
742 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
744 self._api_client.collections().update(
745 uuid=self._remote_collection.manifest_locator(),
746 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
747 ).execute(num_retries=self.num_retries)
748 except Exception as e:
749 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
751 self.bytes_written = self.bytes_skipped
752 # Call the reporter, if any
753 self.report_progress()
755 def report_progress(self):
756 if self.reporter is not None:
757 self.reporter(self.bytes_written, self.bytes_expected)
759 def _write_stdin(self, filename):
760 output = self._local_collection.open(filename, 'wb')
761 self._write(sys.stdin.buffer, output)
764 def _check_file(self, source, filename):
766 Check if this file needs to be uploaded
768 # Ignore symlinks when requested
769 if (not self.follow_links) and os.path.islink(source):
772 should_upload = False
773 new_file_in_cache = False
774 # Record file path for updating the remote collection before exiting
775 self._file_paths.add(filename)
777 with self._state_lock:
778 # If no previous cached data on this file, store it for an eventual
780 if source not in self._state['files']:
781 self._state['files'][source] = {
782 'mtime': os.path.getmtime(source),
783 'size' : os.path.getsize(source)
785 new_file_in_cache = True
786 cached_file_data = self._state['files'][source]
788 # Check if file was already uploaded (at least partially)
789 file_in_local_collection = self._local_collection.find(filename)
791 # If not resuming, upload the full file.
794 # New file detected from last run, upload it.
795 elif new_file_in_cache:
797 # Local file didn't change from last run.
798 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
799 if not file_in_local_collection:
800 # File not uploaded yet, upload it completely
802 elif file_in_local_collection.permission_expired():
803 # Permission token expired, re-upload file. This will change whenever
804 # we have a API for refreshing tokens.
805 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
807 self._local_collection.remove(filename)
808 elif cached_file_data['size'] == file_in_local_collection.size():
809 # File already there, skip it.
810 self.bytes_skipped += cached_file_data['size']
811 elif cached_file_data['size'] > file_in_local_collection.size():
812 # File partially uploaded, resume!
813 resume_offset = file_in_local_collection.size()
814 self.bytes_skipped += resume_offset
817 # Inconsistent cache, re-upload the file
819 self._local_collection.remove(filename)
820 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
821 # Local file differs from cached data, re-upload it.
823 if file_in_local_collection:
824 self._local_collection.remove(filename)
829 self._files_to_upload.append((source, resume_offset, filename))
830 except ArvPutUploadIsPending:
831 # This could happen when running on dry-mode, close cache file to
832 # avoid locking issues.
833 self._cache_file.close()
836 def _upload_files(self):
837 for source, resume_offset, filename in self._files_to_upload:
838 with open(source, 'rb') as source_fd:
839 with self._state_lock:
840 self._state['files'][source]['mtime'] = os.path.getmtime(source)
841 self._state['files'][source]['size'] = os.path.getsize(source)
842 if resume_offset > 0:
843 # Start upload where we left off
844 output = self._local_collection.open(filename, 'ab')
845 source_fd.seek(resume_offset)
848 output = self._local_collection.open(filename, 'wb')
849 self._write(source_fd, output)
850 output.close(flush=False)
852 def _write(self, source_fd, output):
854 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
859 def _my_collection(self):
860 return self._remote_collection if self.update else self._local_collection
862 def _get_cache_filepath(self):
863 # Set up cache file name from input paths.
865 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
866 realpaths = sorted(os.path.realpath(path) for path in self.paths)
867 md5.update(b'\0'.join([p.encode() for p in realpaths]))
869 md5.update(self.filename.encode())
870 cache_path = Path(self.CACHE_DIR)
871 if len(cache_path.parts) == 1:
872 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
874 # Note this is a noop if cache_path is absolute, which is what we want.
875 cache_path = Path.home() / cache_path
876 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
877 return str(cache_path / md5.hexdigest())
879 def _setup_state(self, update_collection):
881 Create a new cache file or load a previously existing one.
883 # Load an already existing collection for update
884 if update_collection and re.match(arvados.util.collection_uuid_pattern,
887 self._remote_collection = arvados.collection.Collection(
889 api_client=self._api_client,
890 storage_classes_desired=self.storage_classes,
891 num_retries=self.num_retries)
892 except arvados.errors.ApiError as error:
893 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
896 elif update_collection:
897 # Collection locator provided, but unknown format
898 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
901 cache_filepath = self._get_cache_filepath()
902 if self.resume and os.path.exists(cache_filepath):
903 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
904 self._cache_file = open(cache_filepath, 'a+')
906 # --no-resume means start with a empty cache file.
907 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
908 self._cache_file = open(cache_filepath, 'w+')
909 self._cache_filename = self._cache_file.name
910 self._lock_file(self._cache_file)
911 self._cache_file.seek(0)
913 with self._state_lock:
916 self._state = json.load(self._cache_file)
917 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
918 # Cache at least partially incomplete, set up new cache
919 self._state = copy.deepcopy(self.EMPTY_STATE)
921 # Cache file empty, set up new cache
922 self._state = copy.deepcopy(self.EMPTY_STATE)
924 self.logger.info("No cache usage requested for this run.")
925 # No cache file, set empty state
926 self._state = copy.deepcopy(self.EMPTY_STATE)
927 if not self._cached_manifest_valid():
928 if not self.batch_mode:
929 raise ResumeCacheInvalidError()
931 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
932 self.use_cache = False # Don't overwrite preexisting cache file.
933 self._state = copy.deepcopy(self.EMPTY_STATE)
934 # Load the previous manifest so we can check if files were modified remotely.
935 self._local_collection = arvados.collection.Collection(
936 self._state['manifest'],
937 replication_desired=self.replication_desired,
938 storage_classes_desired=self.storage_classes,
939 put_threads=self.put_threads,
940 api_client=self._api_client,
941 num_retries=self.num_retries)
943 def _cached_manifest_valid(self):
945 Validate the oldest non-expired block signature to check if cached manifest
946 is usable: checking if the cached manifest was not created with a different
949 if self._state.get('manifest', None) is None:
950 # No cached manifest yet, all good.
952 now = datetime.datetime.utcnow()
956 for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
959 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
961 # Locator without signature
964 if exp > now and (oldest_exp is None or exp < oldest_exp):
968 # No block signatures found => no invalid block signatures.
970 if oldest_loc is None:
971 # Locator signatures found, but all have expired.
972 # Reset the cache and move on.
973 self.logger.info('Cache expired, starting from scratch.')
974 self._state['manifest'] = ''
976 kc = arvados.KeepClient(api_client=self._api_client,
977 num_retries=self.num_retries)
980 except arvados.errors.KeepRequestError:
981 # Something is wrong, cached manifest is not valid.
985 def collection_file_paths(self, col, path_prefix='.'):
986 """Return a list of file paths by recursively go through the entire collection `col`"""
988 for name, item in col.items():
989 if isinstance(item, arvados.arvfile.ArvadosFile):
990 file_paths.append(os.path.join(path_prefix, name))
991 elif isinstance(item, arvados.collection.Subcollection):
992 new_prefix = os.path.join(path_prefix, name)
993 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
996 def _lock_file(self, fileobj):
998 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
1000 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1002 def _save_state(self):
1004 Atomically save current state into cache.
1006 with self._state_lock:
1007 # We're not using copy.deepcopy() here because it's a lot slower
1008 # than json.dumps(), and we're already needing JSON format to be
1010 state = json.dumps(self._state)
1012 new_cache = tempfile.NamedTemporaryFile(
1014 dir=os.path.dirname(self._cache_filename), delete=False)
1015 self._lock_file(new_cache)
1016 new_cache.write(state)
1019 os.rename(new_cache.name, self._cache_filename)
1020 except (IOError, OSError, ResumeCacheConflict) as error:
1021 self.logger.error("There was a problem while saving the cache file: {}".format(error))
1023 os.unlink(new_cache_name)
1024 except NameError: # mkstemp failed.
1027 self._cache_file.close()
1028 self._cache_file = new_cache
1030 def collection_name(self):
1031 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1033 def collection_trash_at(self):
1034 return self._my_collection().get_trash_at()
1036 def manifest_locator(self):
1037 return self._my_collection().manifest_locator()
1039 def portable_data_hash(self):
1040 pdh = self._my_collection().portable_data_hash()
1041 m = self._my_collection().stripped_manifest().encode()
1042 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1043 if pdh != local_pdh:
1044 self.logger.warning("\n".join([
1045 "arv-put: API server provided PDH differs from local manifest.",
1046 " This should not happen; showing API server version."]))
1049 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1050 return self._my_collection().manifest_text(stream_name, strip, normalize)
1052 def _datablocks_on_item(self, item):
1054 Return a list of datablock locators, recursively navigating
1055 through subcollections
1057 if isinstance(item, arvados.arvfile.ArvadosFile):
1058 if item.size() == 0:
1059 # Empty file locator
1060 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1063 for segment in item.segments():
1064 loc = segment.locator
1065 locators.append(loc)
1067 elif isinstance(item, arvados.collection.Collection):
1068 l = [self._datablocks_on_item(x) for x in item.values()]
1069 # Fast list flattener method taken from:
1070 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1071 return [loc for sublist in l for loc in sublist]
1075 def data_locators(self):
1076 with self._collection_lock:
1077 # Make sure all datablocks are flushed before getting the locators
1078 self._my_collection().manifest_text()
1079 datablocks = self._datablocks_on_item(self._my_collection())
1082 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1085 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1086 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1087 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1088 # so instead we're using it on every path component.
1089 def pathname_match(pathname, pattern):
1090 name = pathname.split(os.sep)
1091 # Fix patterns like 'some/subdir/' or 'some//subdir'
1092 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1093 if len(name) != len(pat):
1095 for i in range(len(name)):
1096 if not fnmatch.fnmatch(name[i], pat[i]):
1100 def machine_progress(bytes_written, bytes_expected):
1101 return _machine_format.format(
1102 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1104 def human_progress(bytes_written, bytes_expected):
1106 return "\r{}M / {}M {:.1%} ".format(
1107 bytes_written >> 20, bytes_expected >> 20,
1108 float(bytes_written) / bytes_expected)
1110 return "\r{} ".format(bytes_written)
1112 def progress_writer(progress_func, outfile=sys.stderr):
1113 def write_progress(bytes_written, bytes_expected):
1114 outfile.write(progress_func(bytes_written, bytes_expected))
1115 return write_progress
1117 def desired_project_uuid(api_client, project_uuid, num_retries):
1118 if not project_uuid:
1119 query = api_client.users().current()
1120 elif arvados.util.user_uuid_pattern.match(project_uuid):
1121 query = api_client.users().get(uuid=project_uuid)
1122 elif arvados.util.group_uuid_pattern.match(project_uuid):
1123 query = api_client.groups().get(uuid=project_uuid)
1125 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1126 return query.execute(num_retries=num_retries)['uuid']
1128 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1129 install_sig_handlers=True):
1132 args = parse_arguments(arguments)
1133 logger = logging.getLogger('arvados.arv_put')
1135 logger.setLevel(logging.WARNING)
1137 logger.setLevel(logging.INFO)
1140 request_id = arvados.util.new_request_id()
1142 formatter = ArvPutLogFormatter(request_id)
1143 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1145 if api_client is None:
1146 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1148 if install_sig_handlers:
1149 arv_cmd.install_signal_handlers()
1151 # Trash arguments validation
1153 if args.trash_at is not None:
1154 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1155 # make sure the user provides a complete YYYY-MM-DD date.
1156 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1157 logger.error("--trash-at argument format invalid, use --help to see examples.")
1159 # Check if no time information was provided. In that case, assume end-of-day.
1160 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1161 args.trash_at += 'T23:59:59'
1163 trash_at = ciso8601.parse_datetime(args.trash_at)
1165 logger.error("--trash-at argument format invalid, use --help to see examples.")
1168 if trash_at.tzinfo is not None:
1169 # Timezone aware datetime provided.
1170 utcoffset = -trash_at.utcoffset()
1172 # Timezone naive datetime provided. Assume is local.
1174 utcoffset = datetime.timedelta(seconds=time.altzone)
1176 utcoffset = datetime.timedelta(seconds=time.timezone)
1177 # Convert to UTC timezone naive datetime.
1178 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1180 if trash_at <= datetime.datetime.utcnow():
1181 logger.error("--trash-at argument must be set in the future")
1183 if args.trash_after is not None:
1184 if args.trash_after < 1:
1185 logger.error("--trash-after argument must be >= 1")
1187 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1189 # Determine the name to use
1191 if args.stream or args.raw:
1192 logger.error("Cannot use --name with --stream or --raw")
1194 elif args.update_collection:
1195 logger.error("Cannot use --name with --update-collection")
1197 collection_name = args.name
1199 collection_name = "Saved at {} by {}@{}".format(
1200 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1201 pwd.getpwuid(os.getuid()).pw_name,
1202 socket.gethostname())
1204 if args.project_uuid and (args.stream or args.raw):
1205 logger.error("Cannot use --project-uuid with --stream or --raw")
1208 # Determine the parent project
1210 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1212 except (apiclient_errors.Error, ValueError) as error:
1217 reporter = progress_writer(human_progress)
1218 elif args.batch_progress:
1219 reporter = progress_writer(machine_progress)
1223 # Split storage-classes argument
1224 storage_classes = None
1225 if args.storage_classes:
1226 storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1228 # Setup exclude regex from all the --exclude arguments provided
1231 exclude_names = None
1232 if len(args.exclude) > 0:
1233 # We're supporting 2 kinds of exclusion patterns:
1234 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1235 # the name, wherever the file is on the tree)
1236 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1237 # entire path, and should be relative to
1238 # any input dir argument)
1239 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1240 # placed directly underneath the input dir)
1241 for p in args.exclude:
1242 # Only relative paths patterns allowed
1243 if p.startswith(os.sep):
1244 logger.error("Cannot use absolute paths with --exclude")
1246 if os.path.dirname(p):
1247 # We don't support of path patterns with '..'
1248 p_parts = p.split(os.sep)
1251 "Cannot use path patterns that include or '..'")
1253 # Path search pattern
1254 exclude_paths.append(p)
1256 # Name-only search pattern
1257 name_patterns.append(p)
1258 # For name only matching, we can combine all patterns into a single
1259 # regexp, for better performance.
1260 exclude_names = re.compile('|'.join(
1261 [fnmatch.translate(p) for p in name_patterns]
1262 )) if len(name_patterns) > 0 else None
1263 # Show the user the patterns to be used, just in case they weren't
1264 # specified inside quotes and got changed by the shell expansion.
1265 logger.info("Exclude patterns: {}".format(args.exclude))
1267 # If this is used by a human, and there's at least one directory to be
1268 # uploaded, the expected bytes calculation can take a moment.
1269 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1270 logger.info("Calculating upload size, this could take some time...")
1272 writer = ArvPutUploadJob(paths = args.paths,
1273 resume = args.resume,
1274 use_cache = args.use_cache,
1275 batch_mode= args.batch,
1276 filename = args.filename,
1277 reporter = reporter,
1278 api_client = api_client,
1279 num_retries = args.retries,
1280 replication_desired = args.replication,
1281 put_threads = args.threads,
1282 name = collection_name,
1283 owner_uuid = project_uuid,
1284 ensure_unique_name = True,
1285 update_collection = args.update_collection,
1286 storage_classes=storage_classes,
1288 dry_run=args.dry_run,
1289 follow_links=args.follow_links,
1290 exclude_paths=exclude_paths,
1291 exclude_names=exclude_names,
1293 except ResumeCacheConflict:
1294 logger.error("\n".join([
1295 "arv-put: Another process is already uploading this data.",
1296 " Use --no-cache if this is really what you want."]))
1298 except ResumeCacheInvalidError:
1299 logger.error("\n".join([
1300 "arv-put: Resume cache contains invalid signature: it may have expired",
1301 " or been created with another Arvados user's credentials.",
1302 " Switch user or use one of the following options to restart upload:",
1303 " --no-resume to start a new resume cache.",
1304 " --no-cache to disable resume cache.",
1305 " --batch to ignore the resume cache if invalid."]))
1307 except (CollectionUpdateError, PathDoesNotExistError) as error:
1308 logger.error("\n".join([
1309 "arv-put: %s" % str(error)]))
1311 except ArvPutUploadIsPending:
1312 # Dry run check successful, return proper exit code.
1314 except ArvPutUploadNotPending:
1315 # No files pending for upload
1318 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1319 logger.warning("\n".join([
1320 "arv-put: Resuming previous upload from last checkpoint.",
1321 " Use the --no-resume option to start over."]))
1323 if not args.dry_run:
1324 writer.report_progress()
1327 writer.start(save_collection=not(args.stream or args.raw))
1328 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1329 logger.error("\n".join([
1330 "arv-put: %s" % str(error)]))
1333 if args.progress: # Print newline to split stderr from stdout for humans.
1338 output = writer.manifest_text(normalize=True)
1340 output = writer.manifest_text()
1342 output = ','.join(writer.data_locators())
1343 elif writer.manifest_locator() is not None:
1345 expiration_notice = ""
1346 if writer.collection_trash_at() is not None:
1347 # Get the local timezone-naive version, and log it with timezone information.
1349 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1351 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1352 expiration_notice = ". It will expire on {} {}.".format(
1353 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1354 if args.update_collection:
1355 logger.info(u"Collection updated: '{}'{}".format(
1356 writer.collection_name(), expiration_notice))
1358 logger.info(u"Collection saved as '{}'{}".format(
1359 writer.collection_name(), expiration_notice))
1360 if args.portable_data_hash:
1361 output = writer.portable_data_hash()
1363 output = writer.manifest_locator()
1364 except apiclient_errors.Error as error:
1366 "arv-put: Error creating Collection on project: {}.".format(
1372 # Print the locator (uuid) of the new collection.
1374 status = status or 1
1375 elif not args.silent:
1376 stdout.write(output)
1377 if not output.endswith('\n'):
1380 if install_sig_handlers:
1381 arv_cmd.restore_signal_handlers()
1390 if __name__ == '__main__':