1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
37 import arvados.commands._util as arv_cmd
41 upload_opts = argparse.ArgumentParser(add_help=False)
43 upload_opts.add_argument('--version', action='version',
44 version="%s %s" % (sys.argv[0], __version__),
45 help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
53 _group = upload_opts.add_mutually_exclusive_group()
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56 default=-1, help=argparse.SUPPRESS)
58 _group.add_argument('--normalize', action='store_true',
60 Normalize the manifest by re-ordering files and streams after writing
64 _group.add_argument('--dry-run', action='store_true', default=False,
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
70 _group = upload_opts.add_mutually_exclusive_group()
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
77 _group.add_argument('--stream', action='store_true',
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
153 On high latency installations, using a greater number will improve
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158 action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171 dest='follow_links', help="""
172 Follow file and directory symlinks (default).
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176 Ignore file and directory symlinks. Even paths given explicitly on the
177 command line will be skipped if they are symlinks.
181 run_opts = argparse.ArgumentParser(add_help=False)
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
200 _group.add_argument('--no-progress', action='store_true',
202 Do not display human-readable progress on stderr, even if stderr is a
206 _group.add_argument('--batch-progress', action='store_true',
208 Display machine-readable progress on stderr (bytes and, if known,
212 run_opts.add_argument('--silent', action='store_true',
214 Do not print any debug messages to console. (Any error messages will
218 _group = run_opts.add_mutually_exclusive_group()
219 _group.add_argument('--resume', action='store_true', default=True,
221 Continue interrupted uploads from cached state (default).
223 _group.add_argument('--no-resume', action='store_false', dest='resume',
225 Do not continue interrupted uploads from cached state.
228 _group = run_opts.add_mutually_exclusive_group()
229 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
231 Save upload state in a cache file for resuming (default).
233 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
235 Do not save upload state in a cache file for resuming.
238 _group = upload_opts.add_mutually_exclusive_group()
239 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
241 Set the trash date of the resulting collection to an absolute date in the future.
242 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
243 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
245 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
247 Set the trash date of the resulting collection to an amount of days from the
248 date/time that the upload process finishes.
251 arg_parser = argparse.ArgumentParser(
252 description='Copy data from the local filesystem to Keep.',
253 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
255 def parse_arguments(arguments):
256 args = arg_parser.parse_args(arguments)
258 if len(args.paths) == 0:
261 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
263 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
265 --filename argument cannot be used when storing a directory or
269 # Turn on --progress by default if stderr is a tty.
270 if (not (args.batch_progress or args.no_progress or args.silent)
271 and os.isatty(sys.stderr.fileno())):
274 # Turn off --resume (default) if --no-cache is used.
275 if not args.use_cache:
278 if args.paths == ['-']:
279 if args.update_collection:
281 --update-collection cannot be used when reading from stdin.
284 args.use_cache = False
285 if not args.filename:
286 args.filename = 'stdin'
288 # Remove possible duplicated patterns
289 if len(args.exclude) > 0:
290 args.exclude = list(set(args.exclude))
295 class PathDoesNotExistError(Exception):
299 class CollectionUpdateError(Exception):
303 class ResumeCacheConflict(Exception):
307 class ResumeCacheInvalidError(Exception):
310 class ArvPutArgumentConflict(Exception):
314 class ArvPutUploadIsPending(Exception):
318 class ArvPutUploadNotPending(Exception):
322 class FileUploadList(list):
323 def __init__(self, dry_run=False):
325 self.dry_run = dry_run
327 def append(self, other):
329 raise ArvPutUploadIsPending()
330 super(FileUploadList, self).append(other)
333 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
334 class ArvPutLogFormatter(logging.Formatter):
335 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
337 request_id_informed = False
339 def __init__(self, request_id):
340 self.err_fmtr = logging.Formatter(
341 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
342 arvados.log_date_format)
344 def format(self, record):
345 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
346 self.request_id_informed = True
347 return self.err_fmtr.format(record)
348 return self.std_fmtr.format(record)
351 class ResumeCache(object):
352 CACHE_DIR = '.cache/arvados/arv-put'
354 def __init__(self, file_spec):
355 self.cache_file = open(file_spec, 'a+')
356 self._lock_file(self.cache_file)
357 self.filename = self.cache_file.name
360 def make_path(cls, args):
362 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
363 realpaths = sorted(os.path.realpath(path) for path in args.paths)
364 md5.update(b'\0'.join([p.encode() for p in realpaths]))
365 if any(os.path.isdir(path) for path in realpaths):
368 md5.update(args.filename.encode())
370 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
373 def _lock_file(self, fileobj):
375 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
377 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
380 self.cache_file.seek(0)
381 return json.load(self.cache_file)
383 def check_cache(self, api_client=None, num_retries=0):
388 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
389 locator = state["_finished_streams"][0][1][0]
390 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
391 locator = state["_current_stream_locators"][0]
392 if locator is not None:
393 kc = arvados.keep.KeepClient(api_client=api_client)
394 kc.head(locator, num_retries=num_retries)
395 except Exception as e:
400 def save(self, data):
402 new_cache_fd, new_cache_name = tempfile.mkstemp(
403 dir=os.path.dirname(self.filename))
404 self._lock_file(new_cache_fd)
405 new_cache = os.fdopen(new_cache_fd, 'r+')
406 json.dump(data, new_cache)
407 os.rename(new_cache_name, self.filename)
408 except (IOError, OSError, ResumeCacheConflict):
410 os.unlink(new_cache_name)
411 except NameError: # mkstemp failed.
414 self.cache_file.close()
415 self.cache_file = new_cache
418 self.cache_file.close()
422 os.unlink(self.filename)
423 except OSError as error:
424 if error.errno != errno.ENOENT: # That's what we wanted anyway.
430 self.__init__(self.filename)
433 class ArvPutUploadJob(object):
434 CACHE_DIR = '.cache/arvados/arv-put'
436 'manifest' : None, # Last saved manifest checkpoint
437 'files' : {} # Previous run file list: {path : {size, mtime}}
440 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
441 name=None, owner_uuid=None, api_client=None,
442 ensure_unique_name=False, num_retries=None,
443 put_threads=None, replication_desired=None, filename=None,
444 update_time=60.0, update_collection=None, storage_classes=None,
445 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
446 follow_links=True, exclude_paths=[], exclude_names=None,
450 self.use_cache = use_cache
452 self.reporter = reporter
453 # This will set to 0 before start counting, if no special files are going
455 self.bytes_expected = None
456 self.bytes_written = 0
457 self.bytes_skipped = 0
459 self.owner_uuid = owner_uuid
460 self.ensure_unique_name = ensure_unique_name
461 self.num_retries = num_retries
462 self.replication_desired = replication_desired
463 self.put_threads = put_threads
464 self.filename = filename
465 self.storage_classes = storage_classes
466 self._api_client = api_client
467 self._state_lock = threading.Lock()
468 self._state = None # Previous run state (file list & manifest)
469 self._current_files = [] # Current run file list
470 self._cache_file = None
471 self._collection_lock = threading.Lock()
472 self._remote_collection = None # Collection being updated (if asked)
473 self._local_collection = None # Collection from previous run manifest
474 self._file_paths = set() # Files to be updated in remote collection
475 self._stop_checkpointer = threading.Event()
476 self._checkpointer = threading.Thread(target=self._update_task)
477 self._checkpointer.daemon = True
478 self._update_task_time = update_time # How many seconds wait between update runs
479 self._files_to_upload = FileUploadList(dry_run=dry_run)
480 self._upload_started = False
482 self.dry_run = dry_run
483 self._checkpoint_before_quit = True
484 self.follow_links = follow_links
485 self.exclude_paths = exclude_paths
486 self.exclude_names = exclude_names
487 self._trash_at = trash_at
489 if self._trash_at is not None:
490 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
491 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
492 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
493 raise TypeError('provided trash_at datetime should be timezone-naive')
495 if not self.use_cache and self.resume:
496 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
498 # Check for obvious dry-run responses
499 if self.dry_run and (not self.use_cache or not self.resume):
500 raise ArvPutUploadIsPending()
502 # Load cached data if any and if needed
503 self._setup_state(update_collection)
505 # Build the upload file list, excluding requested files and counting the
506 # bytes expected to be uploaded.
507 self._build_upload_list()
509 def _build_upload_list(self):
511 Scan the requested paths to count file sizes, excluding requested files
512 and dirs and building the upload file list.
514 # If there aren't special files to be read, reset total bytes count to zero
516 if not any([p for p in self.paths
517 if not (os.path.isfile(p) or os.path.isdir(p))]):
518 self.bytes_expected = 0
520 for path in self.paths:
521 # Test for stdin first, in case some file named '-' exist
524 raise ArvPutUploadIsPending()
525 self._write_stdin(self.filename or 'stdin')
526 elif not os.path.exists(path):
527 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
528 elif (not self.follow_links) and os.path.islink(path):
529 self.logger.warning("Skipping symlink '{}'".format(path))
531 elif os.path.isdir(path):
532 # Use absolute paths on cache index so CWD doesn't interfere
533 # with the caching logic.
535 path = os.path.abspath(path)
536 if orig_path[-1:] == os.sep:
537 # When passing a directory reference with a trailing slash,
538 # its contents should be uploaded directly to the
542 # When passing a directory reference with no trailing slash,
543 # upload the directory to the collection's root.
544 prefixdir = os.path.dirname(path)
546 for root, dirs, files in os.walk(path,
547 followlinks=self.follow_links):
548 root_relpath = os.path.relpath(root, path)
549 if root_relpath == '.':
551 # Exclude files/dirs by full path matching pattern
552 if self.exclude_paths:
553 dirs[:] = [d for d in dirs
554 if not any(pathname_match(
555 os.path.join(root_relpath, d), pat)
556 for pat in self.exclude_paths)]
557 files = [f for f in files
558 if not any(pathname_match(
559 os.path.join(root_relpath, f), pat)
560 for pat in self.exclude_paths)]
561 # Exclude files/dirs by name matching pattern
562 if self.exclude_names is not None:
563 dirs[:] = [d for d in dirs
564 if not self.exclude_names.match(d)]
565 files = [f for f in files
566 if not self.exclude_names.match(f)]
567 # Make os.walk()'s dir traversing order deterministic
571 filepath = os.path.join(root, f)
572 # Add its size to the total bytes count (if applicable)
573 if self.follow_links or (not os.path.islink(filepath)):
574 if self.bytes_expected is not None:
575 self.bytes_expected += os.path.getsize(filepath)
576 self._check_file(filepath,
577 os.path.join(root[len(prefixdir):], f))
579 filepath = os.path.abspath(path)
580 # Add its size to the total bytes count (if applicable)
581 if self.follow_links or (not os.path.islink(filepath)):
582 if self.bytes_expected is not None:
583 self.bytes_expected += os.path.getsize(filepath)
584 self._check_file(filepath,
585 self.filename or os.path.basename(path))
586 # If dry-mode is on, and got up to this point, then we should notify that
587 # there aren't any file to upload.
589 raise ArvPutUploadNotPending()
590 # Remove local_collection's files that don't exist locally anymore, so the
591 # bytes_written count is correct.
592 for f in self.collection_file_paths(self._local_collection,
594 if f != 'stdin' and f != self.filename and not f in self._file_paths:
595 self._local_collection.remove(f)
597 def start(self, save_collection):
599 Start supporting thread & file uploading
601 self._checkpointer.start()
603 # Update bytes_written from current local collection and
604 # report initial progress.
607 self._upload_started = True # Used by the update thread to start checkpointing
609 except (SystemExit, Exception) as e:
610 self._checkpoint_before_quit = False
611 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
612 # Note: We're expecting SystemExit instead of
613 # KeyboardInterrupt because we have a custom signal
614 # handler in place that raises SystemExit with the catched
616 if isinstance(e, PathDoesNotExistError):
617 # We aren't interested in the traceback for this case
619 elif not isinstance(e, SystemExit) or e.code != -2:
620 self.logger.warning("Abnormal termination:\n{}".format(
621 traceback.format_exc()))
625 # Stop the thread before doing anything else
626 self._stop_checkpointer.set()
627 self._checkpointer.join()
628 if self._checkpoint_before_quit:
629 # Commit all pending blocks & one last _update()
630 self._local_collection.manifest_text()
631 self._update(final=True)
633 self.save_collection()
635 self._cache_file.close()
637 def _collection_trash_at(self):
639 Returns the trash date that the collection should use at save time.
640 Takes into account absolute/relative trash_at values requested
643 if type(self._trash_at) == datetime.timedelta:
644 # Get an absolute datetime for trash_at
645 return datetime.datetime.utcnow() + self._trash_at
646 return self._trash_at
648 def save_collection(self):
650 # Check if files should be updated on the remote collection.
651 for fp in self._file_paths:
652 remote_file = self._remote_collection.find(fp)
654 # File don't exist on remote collection, copy it.
655 self._remote_collection.copy(fp, fp, self._local_collection)
656 elif remote_file != self._local_collection.find(fp):
657 # A different file exist on remote collection, overwrite it.
658 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
660 # The file already exist on remote collection, skip it.
662 self._remote_collection.save(storage_classes=self.storage_classes,
663 num_retries=self.num_retries,
664 trash_at=self._collection_trash_at())
666 if len(self._local_collection) == 0:
667 self.logger.warning("No files were uploaded, skipping collection creation.")
669 self._local_collection.save_new(
670 name=self.name, owner_uuid=self.owner_uuid,
671 storage_classes=self.storage_classes,
672 ensure_unique_name=self.ensure_unique_name,
673 num_retries=self.num_retries,
674 trash_at=self._collection_trash_at())
676 def destroy_cache(self):
679 os.unlink(self._cache_filename)
680 except OSError as error:
681 # That's what we wanted anyway.
682 if error.errno != errno.ENOENT:
684 self._cache_file.close()
686 def _collection_size(self, collection):
688 Recursively get the total size of the collection
691 for item in listvalues(collection):
692 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
693 size += self._collection_size(item)
698 def _update_task(self):
700 Periodically called support task. File uploading is
701 asynchronous so we poll status from the collection.
703 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
706 def _update(self, final=False):
708 Update cached manifest text and report progress.
710 if self._upload_started:
711 with self._collection_lock:
712 self.bytes_written = self._collection_size(self._local_collection)
715 manifest = self._local_collection.manifest_text()
717 # Get the manifest text without comitting pending blocks
718 manifest = self._local_collection.manifest_text(strip=False,
722 with self._state_lock:
723 self._state['manifest'] = manifest
727 except Exception as e:
728 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
729 # Keep remote collection's trash_at attribute synced when using relative expire dates
730 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
732 self._api_client.collections().update(
733 uuid=self._remote_collection.manifest_locator(),
734 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
735 ).execute(num_retries=self.num_retries)
736 except Exception as e:
737 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
739 self.bytes_written = self.bytes_skipped
740 # Call the reporter, if any
741 self.report_progress()
743 def report_progress(self):
744 if self.reporter is not None:
745 self.reporter(self.bytes_written, self.bytes_expected)
747 def _write_stdin(self, filename):
748 output = self._local_collection.open(filename, 'wb')
749 self._write(sys.stdin.buffer, output)
752 def _check_file(self, source, filename):
754 Check if this file needs to be uploaded
756 # Ignore symlinks when requested
757 if (not self.follow_links) and os.path.islink(source):
760 should_upload = False
761 new_file_in_cache = False
762 # Record file path for updating the remote collection before exiting
763 self._file_paths.add(filename)
765 with self._state_lock:
766 # If no previous cached data on this file, store it for an eventual
768 if source not in self._state['files']:
769 self._state['files'][source] = {
770 'mtime': os.path.getmtime(source),
771 'size' : os.path.getsize(source)
773 new_file_in_cache = True
774 cached_file_data = self._state['files'][source]
776 # Check if file was already uploaded (at least partially)
777 file_in_local_collection = self._local_collection.find(filename)
779 # If not resuming, upload the full file.
782 # New file detected from last run, upload it.
783 elif new_file_in_cache:
785 # Local file didn't change from last run.
786 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
787 if not file_in_local_collection:
788 # File not uploaded yet, upload it completely
790 elif file_in_local_collection.permission_expired():
791 # Permission token expired, re-upload file. This will change whenever
792 # we have a API for refreshing tokens.
793 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
795 self._local_collection.remove(filename)
796 elif cached_file_data['size'] == file_in_local_collection.size():
797 # File already there, skip it.
798 self.bytes_skipped += cached_file_data['size']
799 elif cached_file_data['size'] > file_in_local_collection.size():
800 # File partially uploaded, resume!
801 resume_offset = file_in_local_collection.size()
802 self.bytes_skipped += resume_offset
805 # Inconsistent cache, re-upload the file
807 self._local_collection.remove(filename)
808 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
809 # Local file differs from cached data, re-upload it.
811 if file_in_local_collection:
812 self._local_collection.remove(filename)
817 self._files_to_upload.append((source, resume_offset, filename))
818 except ArvPutUploadIsPending:
819 # This could happen when running on dry-mode, close cache file to
820 # avoid locking issues.
821 self._cache_file.close()
824 def _upload_files(self):
825 for source, resume_offset, filename in self._files_to_upload:
826 with open(source, 'rb') as source_fd:
827 with self._state_lock:
828 self._state['files'][source]['mtime'] = os.path.getmtime(source)
829 self._state['files'][source]['size'] = os.path.getsize(source)
830 if resume_offset > 0:
831 # Start upload where we left off
832 output = self._local_collection.open(filename, 'ab')
833 source_fd.seek(resume_offset)
836 output = self._local_collection.open(filename, 'wb')
837 self._write(source_fd, output)
838 output.close(flush=False)
840 def _write(self, source_fd, output):
842 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
847 def _my_collection(self):
848 return self._remote_collection if self.update else self._local_collection
850 def _get_cache_filepath(self):
851 # Set up cache file name from input paths.
853 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
854 realpaths = sorted(os.path.realpath(path) for path in self.paths)
855 md5.update(b'\0'.join([p.encode() for p in realpaths]))
857 md5.update(self.filename.encode())
858 cache_filename = md5.hexdigest()
859 cache_filepath = os.path.join(
860 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
862 return cache_filepath
864 def _setup_state(self, update_collection):
866 Create a new cache file or load a previously existing one.
868 # Load an already existing collection for update
869 if update_collection and re.match(arvados.util.collection_uuid_pattern,
872 self._remote_collection = arvados.collection.Collection(
874 api_client=self._api_client,
875 num_retries=self.num_retries)
876 except arvados.errors.ApiError as error:
877 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
880 elif update_collection:
881 # Collection locator provided, but unknown format
882 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
885 cache_filepath = self._get_cache_filepath()
886 if self.resume and os.path.exists(cache_filepath):
887 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
888 self._cache_file = open(cache_filepath, 'a+')
890 # --no-resume means start with a empty cache file.
891 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
892 self._cache_file = open(cache_filepath, 'w+')
893 self._cache_filename = self._cache_file.name
894 self._lock_file(self._cache_file)
895 self._cache_file.seek(0)
897 with self._state_lock:
900 self._state = json.load(self._cache_file)
901 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
902 # Cache at least partially incomplete, set up new cache
903 self._state = copy.deepcopy(self.EMPTY_STATE)
905 # Cache file empty, set up new cache
906 self._state = copy.deepcopy(self.EMPTY_STATE)
908 self.logger.info("No cache usage requested for this run.")
909 # No cache file, set empty state
910 self._state = copy.deepcopy(self.EMPTY_STATE)
911 if not self._cached_manifest_valid():
912 raise ResumeCacheInvalidError()
913 # Load the previous manifest so we can check if files were modified remotely.
914 self._local_collection = arvados.collection.Collection(
915 self._state['manifest'],
916 replication_desired=self.replication_desired,
917 put_threads=self.put_threads,
918 api_client=self._api_client,
919 num_retries=self.num_retries)
921 def _cached_manifest_valid(self):
923 Validate the oldest non-expired block signature to check if cached manifest
924 is usable: checking if the cached manifest was not created with a different
927 if self._state.get('manifest', None) is None:
928 # No cached manifest yet, all good.
930 now = datetime.datetime.utcnow()
934 for m in keep_locator_pattern.finditer(self._state['manifest']):
937 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
939 # Locator without signature
942 if exp > now and (oldest_exp is None or exp < oldest_exp):
946 # No block signatures found => no invalid block signatures.
948 if oldest_loc is None:
949 # Locator signatures found, but all have expired.
950 # Reset the cache and move on.
951 self.logger.info('Cache expired, starting from scratch.')
952 self._state['manifest'] = ''
954 kc = arvados.KeepClient(api_client=self._api_client,
955 num_retries=self.num_retries)
958 except arvados.errors.KeepRequestError:
959 # Something is wrong, cached manifest is not valid.
963 def collection_file_paths(self, col, path_prefix='.'):
964 """Return a list of file paths by recursively go through the entire collection `col`"""
966 for name, item in listitems(col):
967 if isinstance(item, arvados.arvfile.ArvadosFile):
968 file_paths.append(os.path.join(path_prefix, name))
969 elif isinstance(item, arvados.collection.Subcollection):
970 new_prefix = os.path.join(path_prefix, name)
971 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
974 def _lock_file(self, fileobj):
976 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
978 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
980 def _save_state(self):
982 Atomically save current state into cache.
984 with self._state_lock:
985 # We're not using copy.deepcopy() here because it's a lot slower
986 # than json.dumps(), and we're already needing JSON format to be
988 state = json.dumps(self._state)
990 new_cache = tempfile.NamedTemporaryFile(
992 dir=os.path.dirname(self._cache_filename), delete=False)
993 self._lock_file(new_cache)
994 new_cache.write(state)
997 os.rename(new_cache.name, self._cache_filename)
998 except (IOError, OSError, ResumeCacheConflict) as error:
999 self.logger.error("There was a problem while saving the cache file: {}".format(error))
1001 os.unlink(new_cache_name)
1002 except NameError: # mkstemp failed.
1005 self._cache_file.close()
1006 self._cache_file = new_cache
1008 def collection_name(self):
1009 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1011 def collection_trash_at(self):
1012 return self._my_collection().get_trash_at()
1014 def manifest_locator(self):
1015 return self._my_collection().manifest_locator()
1017 def portable_data_hash(self):
1018 pdh = self._my_collection().portable_data_hash()
1019 m = self._my_collection().stripped_manifest().encode()
1020 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1021 if pdh != local_pdh:
1022 self.logger.warning("\n".join([
1023 "arv-put: API server provided PDH differs from local manifest.",
1024 " This should not happen; showing API server version."]))
1027 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1028 return self._my_collection().manifest_text(stream_name, strip, normalize)
1030 def _datablocks_on_item(self, item):
1032 Return a list of datablock locators, recursively navigating
1033 through subcollections
1035 if isinstance(item, arvados.arvfile.ArvadosFile):
1036 if item.size() == 0:
1037 # Empty file locator
1038 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1041 for segment in item.segments():
1042 loc = segment.locator
1043 locators.append(loc)
1045 elif isinstance(item, arvados.collection.Collection):
1046 l = [self._datablocks_on_item(x) for x in listvalues(item)]
1047 # Fast list flattener method taken from:
1048 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1049 return [loc for sublist in l for loc in sublist]
1053 def data_locators(self):
1054 with self._collection_lock:
1055 # Make sure all datablocks are flushed before getting the locators
1056 self._my_collection().manifest_text()
1057 datablocks = self._datablocks_on_item(self._my_collection())
1060 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1063 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1064 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1065 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1066 # so instead we're using it on every path component.
1067 def pathname_match(pathname, pattern):
1068 name = pathname.split(os.sep)
1069 # Fix patterns like 'some/subdir/' or 'some//subdir'
1070 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1071 if len(name) != len(pat):
1073 for i in range(len(name)):
1074 if not fnmatch.fnmatch(name[i], pat[i]):
1078 def machine_progress(bytes_written, bytes_expected):
1079 return _machine_format.format(
1080 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1082 def human_progress(bytes_written, bytes_expected):
1084 return "\r{}M / {}M {:.1%} ".format(
1085 bytes_written >> 20, bytes_expected >> 20,
1086 float(bytes_written) / bytes_expected)
1088 return "\r{} ".format(bytes_written)
1090 def progress_writer(progress_func, outfile=sys.stderr):
1091 def write_progress(bytes_written, bytes_expected):
1092 outfile.write(progress_func(bytes_written, bytes_expected))
1093 return write_progress
1095 def desired_project_uuid(api_client, project_uuid, num_retries):
1096 if not project_uuid:
1097 query = api_client.users().current()
1098 elif arvados.util.user_uuid_pattern.match(project_uuid):
1099 query = api_client.users().get(uuid=project_uuid)
1100 elif arvados.util.group_uuid_pattern.match(project_uuid):
1101 query = api_client.groups().get(uuid=project_uuid)
1103 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1104 return query.execute(num_retries=num_retries)['uuid']
1106 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1107 install_sig_handlers=True):
1110 args = parse_arguments(arguments)
1111 logger = logging.getLogger('arvados.arv_put')
1113 logger.setLevel(logging.WARNING)
1115 logger.setLevel(logging.INFO)
1118 request_id = arvados.util.new_request_id()
1120 formatter = ArvPutLogFormatter(request_id)
1121 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1123 if api_client is None:
1124 api_client = arvados.api('v1', request_id=request_id)
1126 if install_sig_handlers:
1127 arv_cmd.install_signal_handlers()
1129 # Trash arguments validation
1131 if args.trash_at is not None:
1132 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1133 # make sure the user provides a complete YYYY-MM-DD date.
1134 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1135 logger.error("--trash-at argument format invalid, use --help to see examples.")
1137 # Check if no time information was provided. In that case, assume end-of-day.
1138 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1139 args.trash_at += 'T23:59:59'
1141 trash_at = ciso8601.parse_datetime(args.trash_at)
1143 logger.error("--trash-at argument format invalid, use --help to see examples.")
1146 if trash_at.tzinfo is not None:
1147 # Timezone aware datetime provided.
1148 utcoffset = -trash_at.utcoffset()
1150 # Timezone naive datetime provided. Assume is local.
1152 utcoffset = datetime.timedelta(seconds=time.altzone)
1154 utcoffset = datetime.timedelta(seconds=time.timezone)
1155 # Convert to UTC timezone naive datetime.
1156 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1158 if trash_at <= datetime.datetime.utcnow():
1159 logger.error("--trash-at argument must be set in the future")
1161 if args.trash_after is not None:
1162 if args.trash_after < 1:
1163 logger.error("--trash-after argument must be >= 1")
1165 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1167 # Determine the name to use
1169 if args.stream or args.raw:
1170 logger.error("Cannot use --name with --stream or --raw")
1172 elif args.update_collection:
1173 logger.error("Cannot use --name with --update-collection")
1175 collection_name = args.name
1177 collection_name = "Saved at {} by {}@{}".format(
1178 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1179 pwd.getpwuid(os.getuid()).pw_name,
1180 socket.gethostname())
1182 if args.project_uuid and (args.stream or args.raw):
1183 logger.error("Cannot use --project-uuid with --stream or --raw")
1186 # Determine the parent project
1188 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1190 except (apiclient_errors.Error, ValueError) as error:
1195 reporter = progress_writer(human_progress)
1196 elif args.batch_progress:
1197 reporter = progress_writer(machine_progress)
1201 # Split storage-classes argument
1202 storage_classes = None
1203 if args.storage_classes:
1204 storage_classes = args.storage_classes.strip().split(',')
1205 if len(storage_classes) > 1:
1206 logger.error("Multiple storage classes are not supported currently.")
1210 # Setup exclude regex from all the --exclude arguments provided
1213 exclude_names = None
1214 if len(args.exclude) > 0:
1215 # We're supporting 2 kinds of exclusion patterns:
1216 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1217 # the name, wherever the file is on the tree)
1218 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1219 # entire path, and should be relative to
1220 # any input dir argument)
1221 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1222 # placed directly underneath the input dir)
1223 for p in args.exclude:
1224 # Only relative paths patterns allowed
1225 if p.startswith(os.sep):
1226 logger.error("Cannot use absolute paths with --exclude")
1228 if os.path.dirname(p):
1229 # We don't support of path patterns with '..'
1230 p_parts = p.split(os.sep)
1233 "Cannot use path patterns that include or '..'")
1235 # Path search pattern
1236 exclude_paths.append(p)
1238 # Name-only search pattern
1239 name_patterns.append(p)
1240 # For name only matching, we can combine all patterns into a single
1241 # regexp, for better performance.
1242 exclude_names = re.compile('|'.join(
1243 [fnmatch.translate(p) for p in name_patterns]
1244 )) if len(name_patterns) > 0 else None
1245 # Show the user the patterns to be used, just in case they weren't
1246 # specified inside quotes and got changed by the shell expansion.
1247 logger.info("Exclude patterns: {}".format(args.exclude))
1249 # If this is used by a human, and there's at least one directory to be
1250 # uploaded, the expected bytes calculation can take a moment.
1251 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1252 logger.info("Calculating upload size, this could take some time...")
1254 writer = ArvPutUploadJob(paths = args.paths,
1255 resume = args.resume,
1256 use_cache = args.use_cache,
1257 filename = args.filename,
1258 reporter = reporter,
1259 api_client = api_client,
1260 num_retries = args.retries,
1261 replication_desired = args.replication,
1262 put_threads = args.threads,
1263 name = collection_name,
1264 owner_uuid = project_uuid,
1265 ensure_unique_name = True,
1266 update_collection = args.update_collection,
1267 storage_classes=storage_classes,
1269 dry_run=args.dry_run,
1270 follow_links=args.follow_links,
1271 exclude_paths=exclude_paths,
1272 exclude_names=exclude_names,
1274 except ResumeCacheConflict:
1275 logger.error("\n".join([
1276 "arv-put: Another process is already uploading this data.",
1277 " Use --no-cache if this is really what you want."]))
1279 except ResumeCacheInvalidError:
1280 logger.error("\n".join([
1281 "arv-put: Resume cache contains invalid signature: it may have expired",
1282 " or been created with another Arvados user's credentials.",
1283 " Switch user or use one of the following options to restart upload:",
1284 " --no-resume to start a new resume cache.",
1285 " --no-cache to disable resume cache."]))
1287 except (CollectionUpdateError, PathDoesNotExistError) as error:
1288 logger.error("\n".join([
1289 "arv-put: %s" % str(error)]))
1291 except ArvPutUploadIsPending:
1292 # Dry run check successful, return proper exit code.
1294 except ArvPutUploadNotPending:
1295 # No files pending for upload
1298 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1299 logger.warning("\n".join([
1300 "arv-put: Resuming previous upload from last checkpoint.",
1301 " Use the --no-resume option to start over."]))
1303 if not args.dry_run:
1304 writer.report_progress()
1307 writer.start(save_collection=not(args.stream or args.raw))
1308 except arvados.errors.ApiError as error:
1309 logger.error("\n".join([
1310 "arv-put: %s" % str(error)]))
1313 if args.progress: # Print newline to split stderr from stdout for humans.
1318 output = writer.manifest_text(normalize=True)
1320 output = writer.manifest_text()
1322 output = ','.join(writer.data_locators())
1323 elif writer.manifest_locator() is not None:
1325 expiration_notice = ""
1326 if writer.collection_trash_at() is not None:
1327 # Get the local timezone-naive version, and log it with timezone information.
1329 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1331 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1332 expiration_notice = ". It will expire on {} {}.".format(
1333 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1334 if args.update_collection:
1335 logger.info(u"Collection updated: '{}'{}".format(
1336 writer.collection_name(), expiration_notice))
1338 logger.info(u"Collection saved as '{}'{}".format(
1339 writer.collection_name(), expiration_notice))
1340 if args.portable_data_hash:
1341 output = writer.portable_data_hash()
1343 output = writer.manifest_locator()
1344 except apiclient_errors.Error as error:
1346 "arv-put: Error creating Collection on project: {}.".format(
1352 # Print the locator (uuid) of the new collection.
1354 status = status or 1
1355 elif not args.silent:
1356 stdout.write(output)
1357 if not output.endswith('\n'):
1360 if install_sig_handlers:
1361 arv_cmd.restore_signal_handlers()
1370 if __name__ == '__main__':