1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
37 import arvados.commands._util as arv_cmd
41 upload_opts = argparse.ArgumentParser(add_help=False)
43 upload_opts.add_argument('--version', action='version',
44 version="%s %s" % (sys.argv[0], __version__),
45 help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
53 _group = upload_opts.add_mutually_exclusive_group()
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56 default=-1, help=argparse.SUPPRESS)
58 _group.add_argument('--normalize', action='store_true',
60 Normalize the manifest by re-ordering files and streams after writing
64 _group.add_argument('--dry-run', action='store_true', default=False,
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
70 _group = upload_opts.add_mutually_exclusive_group()
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
77 _group.add_argument('--stream', action='store_true',
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
153 On high latency installations, using a greater number will improve
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158 action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171 dest='follow_links', help="""
172 Follow file and directory symlinks (default).
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176 Do not follow file and directory symlinks.
180 run_opts = argparse.ArgumentParser(add_help=False)
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
199 _group.add_argument('--no-progress', action='store_true',
201 Do not display human-readable progress on stderr, even if stderr is a
205 _group.add_argument('--batch-progress', action='store_true',
207 Display machine-readable progress on stderr (bytes and, if known,
211 run_opts.add_argument('--silent', action='store_true',
213 Do not print any debug messages to console. (Any error messages will
217 _group = run_opts.add_mutually_exclusive_group()
218 _group.add_argument('--resume', action='store_true', default=True,
220 Continue interrupted uploads from cached state (default).
222 _group.add_argument('--no-resume', action='store_false', dest='resume',
224 Do not continue interrupted uploads from cached state.
227 _group = run_opts.add_mutually_exclusive_group()
228 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
230 Save upload state in a cache file for resuming (default).
232 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
234 Do not save upload state in a cache file for resuming.
237 _group = upload_opts.add_mutually_exclusive_group()
238 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
240 Set the trash date of the resulting collection to an absolute date in the future.
241 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
242 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
244 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
246 Set the trash date of the resulting collection to an amount of days from the
247 date/time that the upload process finishes.
250 arg_parser = argparse.ArgumentParser(
251 description='Copy data from the local filesystem to Keep.',
252 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
254 def parse_arguments(arguments):
255 args = arg_parser.parse_args(arguments)
257 if len(args.paths) == 0:
260 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
262 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
264 --filename argument cannot be used when storing a directory or
268 # Turn on --progress by default if stderr is a tty.
269 if (not (args.batch_progress or args.no_progress or args.silent)
270 and os.isatty(sys.stderr.fileno())):
273 # Turn off --resume (default) if --no-cache is used.
274 if not args.use_cache:
277 if args.paths == ['-']:
278 if args.update_collection:
280 --update-collection cannot be used when reading from stdin.
283 args.use_cache = False
284 if not args.filename:
285 args.filename = 'stdin'
287 # Remove possible duplicated patterns
288 if len(args.exclude) > 0:
289 args.exclude = list(set(args.exclude))
294 class PathDoesNotExistError(Exception):
298 class CollectionUpdateError(Exception):
302 class ResumeCacheConflict(Exception):
306 class ResumeCacheInvalidError(Exception):
309 class ArvPutArgumentConflict(Exception):
313 class ArvPutUploadIsPending(Exception):
317 class ArvPutUploadNotPending(Exception):
321 class FileUploadList(list):
322 def __init__(self, dry_run=False):
324 self.dry_run = dry_run
326 def append(self, other):
328 raise ArvPutUploadIsPending()
329 super(FileUploadList, self).append(other)
332 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
333 class ArvPutLogFormatter(logging.Formatter):
334 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
336 request_id_informed = False
338 def __init__(self, request_id):
339 self.err_fmtr = logging.Formatter(
340 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
341 arvados.log_date_format)
343 def format(self, record):
344 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
345 self.request_id_informed = True
346 return self.err_fmtr.format(record)
347 return self.std_fmtr.format(record)
350 class ResumeCache(object):
351 CACHE_DIR = '.cache/arvados/arv-put'
353 def __init__(self, file_spec):
354 self.cache_file = open(file_spec, 'a+')
355 self._lock_file(self.cache_file)
356 self.filename = self.cache_file.name
359 def make_path(cls, args):
361 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
362 realpaths = sorted(os.path.realpath(path) for path in args.paths)
363 md5.update(b'\0'.join([p.encode() for p in realpaths]))
364 if any(os.path.isdir(path) for path in realpaths):
367 md5.update(args.filename.encode())
369 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
372 def _lock_file(self, fileobj):
374 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
376 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
379 self.cache_file.seek(0)
380 return json.load(self.cache_file)
382 def check_cache(self, api_client=None, num_retries=0):
387 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
388 locator = state["_finished_streams"][0][1][0]
389 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
390 locator = state["_current_stream_locators"][0]
391 if locator is not None:
392 kc = arvados.keep.KeepClient(api_client=api_client)
393 kc.head(locator, num_retries=num_retries)
394 except Exception as e:
399 def save(self, data):
401 new_cache_fd, new_cache_name = tempfile.mkstemp(
402 dir=os.path.dirname(self.filename))
403 self._lock_file(new_cache_fd)
404 new_cache = os.fdopen(new_cache_fd, 'r+')
405 json.dump(data, new_cache)
406 os.rename(new_cache_name, self.filename)
407 except (IOError, OSError, ResumeCacheConflict):
409 os.unlink(new_cache_name)
410 except NameError: # mkstemp failed.
413 self.cache_file.close()
414 self.cache_file = new_cache
417 self.cache_file.close()
421 os.unlink(self.filename)
422 except OSError as error:
423 if error.errno != errno.ENOENT: # That's what we wanted anyway.
429 self.__init__(self.filename)
432 class ArvPutUploadJob(object):
433 CACHE_DIR = '.cache/arvados/arv-put'
435 'manifest' : None, # Last saved manifest checkpoint
436 'files' : {} # Previous run file list: {path : {size, mtime}}
439 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
440 name=None, owner_uuid=None, api_client=None,
441 ensure_unique_name=False, num_retries=None,
442 put_threads=None, replication_desired=None, filename=None,
443 update_time=60.0, update_collection=None, storage_classes=None,
444 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
445 follow_links=True, exclude_paths=[], exclude_names=None,
449 self.use_cache = use_cache
451 self.reporter = reporter
452 # This will set to 0 before start counting, if no special files are going
454 self.bytes_expected = None
455 self.bytes_written = 0
456 self.bytes_skipped = 0
458 self.owner_uuid = owner_uuid
459 self.ensure_unique_name = ensure_unique_name
460 self.num_retries = num_retries
461 self.replication_desired = replication_desired
462 self.put_threads = put_threads
463 self.filename = filename
464 self.storage_classes = storage_classes
465 self._api_client = api_client
466 self._state_lock = threading.Lock()
467 self._state = None # Previous run state (file list & manifest)
468 self._current_files = [] # Current run file list
469 self._cache_file = None
470 self._collection_lock = threading.Lock()
471 self._remote_collection = None # Collection being updated (if asked)
472 self._local_collection = None # Collection from previous run manifest
473 self._file_paths = set() # Files to be updated in remote collection
474 self._stop_checkpointer = threading.Event()
475 self._checkpointer = threading.Thread(target=self._update_task)
476 self._checkpointer.daemon = True
477 self._update_task_time = update_time # How many seconds wait between update runs
478 self._files_to_upload = FileUploadList(dry_run=dry_run)
479 self._upload_started = False
481 self.dry_run = dry_run
482 self._checkpoint_before_quit = True
483 self.follow_links = follow_links
484 self.exclude_paths = exclude_paths
485 self.exclude_names = exclude_names
486 self._trash_at = trash_at
488 if self._trash_at is not None:
489 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
490 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
491 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
492 raise TypeError('provided trash_at datetime should be timezone-naive')
494 if not self.use_cache and self.resume:
495 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
497 # Check for obvious dry-run responses
498 if self.dry_run and (not self.use_cache or not self.resume):
499 raise ArvPutUploadIsPending()
501 # Load cached data if any and if needed
502 self._setup_state(update_collection)
504 # Build the upload file list, excluding requested files and counting the
505 # bytes expected to be uploaded.
506 self._build_upload_list()
508 def _build_upload_list(self):
510 Scan the requested paths to count file sizes, excluding requested files
511 and dirs and building the upload file list.
513 # If there aren't special files to be read, reset total bytes count to zero
515 if not any([p for p in self.paths
516 if not (os.path.isfile(p) or os.path.isdir(p))]):
517 self.bytes_expected = 0
519 for path in self.paths:
520 # Test for stdin first, in case some file named '-' exist
523 raise ArvPutUploadIsPending()
524 self._write_stdin(self.filename or 'stdin')
525 elif not os.path.exists(path):
526 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
527 elif (not self.follow_links) and os.path.islink(path):
529 elif os.path.isdir(path):
530 # Use absolute paths on cache index so CWD doesn't interfere
531 # with the caching logic.
533 path = os.path.abspath(path)
534 if orig_path[-1:] == os.sep:
535 # When passing a directory reference with a trailing slash,
536 # its contents should be uploaded directly to the
540 # When passing a directory reference with no trailing slash,
541 # upload the directory to the collection's root.
542 prefixdir = os.path.dirname(path)
544 for root, dirs, files in os.walk(path,
545 followlinks=self.follow_links):
546 root_relpath = os.path.relpath(root, path)
547 if root_relpath == '.':
549 # Exclude files/dirs by full path matching pattern
550 if self.exclude_paths:
551 dirs[:] = [d for d in dirs
552 if not any(pathname_match(
553 os.path.join(root_relpath, d), pat)
554 for pat in self.exclude_paths)]
555 files = [f for f in files
556 if not any(pathname_match(
557 os.path.join(root_relpath, f), pat)
558 for pat in self.exclude_paths)]
559 # Exclude files/dirs by name matching pattern
560 if self.exclude_names is not None:
561 dirs[:] = [d for d in dirs
562 if not self.exclude_names.match(d)]
563 files = [f for f in files
564 if not self.exclude_names.match(f)]
565 # Make os.walk()'s dir traversing order deterministic
569 filepath = os.path.join(root, f)
570 # Add its size to the total bytes count (if applicable)
571 if self.follow_links or (not os.path.islink(filepath)):
572 if self.bytes_expected is not None:
573 self.bytes_expected += os.path.getsize(filepath)
574 self._check_file(filepath,
575 os.path.join(root[len(prefixdir):], f))
577 filepath = os.path.abspath(path)
578 # Add its size to the total bytes count (if applicable)
579 if self.follow_links or (not os.path.islink(filepath)):
580 if self.bytes_expected is not None:
581 self.bytes_expected += os.path.getsize(filepath)
582 self._check_file(filepath,
583 self.filename or os.path.basename(path))
584 # If dry-mode is on, and got up to this point, then we should notify that
585 # there aren't any file to upload.
587 raise ArvPutUploadNotPending()
588 # Remove local_collection's files that don't exist locally anymore, so the
589 # bytes_written count is correct.
590 for f in self.collection_file_paths(self._local_collection,
592 if f != 'stdin' and f != self.filename and not f in self._file_paths:
593 self._local_collection.remove(f)
595 def start(self, save_collection):
597 Start supporting thread & file uploading
599 self._checkpointer.start()
601 # Update bytes_written from current local collection and
602 # report initial progress.
605 self._upload_started = True # Used by the update thread to start checkpointing
607 except (SystemExit, Exception) as e:
608 self._checkpoint_before_quit = False
609 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
610 # Note: We're expecting SystemExit instead of
611 # KeyboardInterrupt because we have a custom signal
612 # handler in place that raises SystemExit with the catched
614 if isinstance(e, PathDoesNotExistError):
615 # We aren't interested in the traceback for this case
617 elif not isinstance(e, SystemExit) or e.code != -2:
618 self.logger.warning("Abnormal termination:\n{}".format(
619 traceback.format_exc()))
623 # Stop the thread before doing anything else
624 self._stop_checkpointer.set()
625 self._checkpointer.join()
626 if self._checkpoint_before_quit:
627 # Commit all pending blocks & one last _update()
628 self._local_collection.manifest_text()
629 self._update(final=True)
631 self.save_collection()
633 self._cache_file.close()
635 def _collection_trash_at(self):
637 Returns the trash date that the collection should use at save time.
638 Takes into account absolute/relative trash_at values requested
641 if type(self._trash_at) == datetime.timedelta:
642 # Get an absolute datetime for trash_at
643 return datetime.datetime.utcnow() + self._trash_at
644 return self._trash_at
646 def save_collection(self):
648 # Check if files should be updated on the remote collection.
649 for fp in self._file_paths:
650 remote_file = self._remote_collection.find(fp)
652 # File don't exist on remote collection, copy it.
653 self._remote_collection.copy(fp, fp, self._local_collection)
654 elif remote_file != self._local_collection.find(fp):
655 # A different file exist on remote collection, overwrite it.
656 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
658 # The file already exist on remote collection, skip it.
660 self._remote_collection.save(storage_classes=self.storage_classes,
661 num_retries=self.num_retries,
662 trash_at=self._collection_trash_at())
664 if self.storage_classes is None:
665 self.storage_classes = ['default']
666 self._local_collection.save_new(
667 name=self.name, owner_uuid=self.owner_uuid,
668 storage_classes=self.storage_classes,
669 ensure_unique_name=self.ensure_unique_name,
670 num_retries=self.num_retries,
671 trash_at=self._collection_trash_at())
673 def destroy_cache(self):
676 os.unlink(self._cache_filename)
677 except OSError as error:
678 # That's what we wanted anyway.
679 if error.errno != errno.ENOENT:
681 self._cache_file.close()
683 def _collection_size(self, collection):
685 Recursively get the total size of the collection
688 for item in listvalues(collection):
689 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
690 size += self._collection_size(item)
695 def _update_task(self):
697 Periodically called support task. File uploading is
698 asynchronous so we poll status from the collection.
700 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
703 def _update(self, final=False):
705 Update cached manifest text and report progress.
707 if self._upload_started:
708 with self._collection_lock:
709 self.bytes_written = self._collection_size(self._local_collection)
712 manifest = self._local_collection.manifest_text()
714 # Get the manifest text without comitting pending blocks
715 manifest = self._local_collection.manifest_text(strip=False,
719 with self._state_lock:
720 self._state['manifest'] = manifest
724 except Exception as e:
725 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
726 # Keep remote collection's trash_at attribute synced when using relative expire dates
727 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
729 self._api_client.collections().update(
730 uuid=self._remote_collection.manifest_locator(),
731 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
732 ).execute(num_retries=self.num_retries)
733 except Exception as e:
734 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
736 self.bytes_written = self.bytes_skipped
737 # Call the reporter, if any
738 self.report_progress()
740 def report_progress(self):
741 if self.reporter is not None:
742 self.reporter(self.bytes_written, self.bytes_expected)
744 def _write_stdin(self, filename):
745 output = self._local_collection.open(filename, 'wb')
746 self._write(sys.stdin.buffer, output)
749 def _check_file(self, source, filename):
751 Check if this file needs to be uploaded
753 # Ignore symlinks when requested
754 if (not self.follow_links) and os.path.islink(source):
757 should_upload = False
758 new_file_in_cache = False
759 # Record file path for updating the remote collection before exiting
760 self._file_paths.add(filename)
762 with self._state_lock:
763 # If no previous cached data on this file, store it for an eventual
765 if source not in self._state['files']:
766 self._state['files'][source] = {
767 'mtime': os.path.getmtime(source),
768 'size' : os.path.getsize(source)
770 new_file_in_cache = True
771 cached_file_data = self._state['files'][source]
773 # Check if file was already uploaded (at least partially)
774 file_in_local_collection = self._local_collection.find(filename)
776 # If not resuming, upload the full file.
779 # New file detected from last run, upload it.
780 elif new_file_in_cache:
782 # Local file didn't change from last run.
783 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
784 if not file_in_local_collection:
785 # File not uploaded yet, upload it completely
787 elif file_in_local_collection.permission_expired():
788 # Permission token expired, re-upload file. This will change whenever
789 # we have a API for refreshing tokens.
790 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
792 self._local_collection.remove(filename)
793 elif cached_file_data['size'] == file_in_local_collection.size():
794 # File already there, skip it.
795 self.bytes_skipped += cached_file_data['size']
796 elif cached_file_data['size'] > file_in_local_collection.size():
797 # File partially uploaded, resume!
798 resume_offset = file_in_local_collection.size()
799 self.bytes_skipped += resume_offset
802 # Inconsistent cache, re-upload the file
804 self._local_collection.remove(filename)
805 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
806 # Local file differs from cached data, re-upload it.
808 if file_in_local_collection:
809 self._local_collection.remove(filename)
814 self._files_to_upload.append((source, resume_offset, filename))
815 except ArvPutUploadIsPending:
816 # This could happen when running on dry-mode, close cache file to
817 # avoid locking issues.
818 self._cache_file.close()
821 def _upload_files(self):
822 for source, resume_offset, filename in self._files_to_upload:
823 with open(source, 'rb') as source_fd:
824 with self._state_lock:
825 self._state['files'][source]['mtime'] = os.path.getmtime(source)
826 self._state['files'][source]['size'] = os.path.getsize(source)
827 if resume_offset > 0:
828 # Start upload where we left off
829 output = self._local_collection.open(filename, 'ab')
830 source_fd.seek(resume_offset)
833 output = self._local_collection.open(filename, 'wb')
834 self._write(source_fd, output)
835 output.close(flush=False)
837 def _write(self, source_fd, output):
839 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
844 def _my_collection(self):
845 return self._remote_collection if self.update else self._local_collection
847 def _get_cache_filepath(self):
848 # Set up cache file name from input paths.
850 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
851 realpaths = sorted(os.path.realpath(path) for path in self.paths)
852 md5.update(b'\0'.join([p.encode() for p in realpaths]))
854 md5.update(self.filename.encode())
855 cache_filename = md5.hexdigest()
856 cache_filepath = os.path.join(
857 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
859 return cache_filepath
861 def _setup_state(self, update_collection):
863 Create a new cache file or load a previously existing one.
865 # Load an already existing collection for update
866 if update_collection and re.match(arvados.util.collection_uuid_pattern,
869 self._remote_collection = arvados.collection.Collection(
871 api_client=self._api_client,
872 num_retries=self.num_retries)
873 except arvados.errors.ApiError as error:
874 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
877 elif update_collection:
878 # Collection locator provided, but unknown format
879 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
882 cache_filepath = self._get_cache_filepath()
883 if self.resume and os.path.exists(cache_filepath):
884 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
885 self._cache_file = open(cache_filepath, 'a+')
887 # --no-resume means start with a empty cache file.
888 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
889 self._cache_file = open(cache_filepath, 'w+')
890 self._cache_filename = self._cache_file.name
891 self._lock_file(self._cache_file)
892 self._cache_file.seek(0)
894 with self._state_lock:
897 self._state = json.load(self._cache_file)
898 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
899 # Cache at least partially incomplete, set up new cache
900 self._state = copy.deepcopy(self.EMPTY_STATE)
902 # Cache file empty, set up new cache
903 self._state = copy.deepcopy(self.EMPTY_STATE)
905 self.logger.info("No cache usage requested for this run.")
906 # No cache file, set empty state
907 self._state = copy.deepcopy(self.EMPTY_STATE)
908 if not self._cached_manifest_valid():
909 raise ResumeCacheInvalidError()
910 # Load the previous manifest so we can check if files were modified remotely.
911 self._local_collection = arvados.collection.Collection(
912 self._state['manifest'],
913 replication_desired=self.replication_desired,
914 put_threads=self.put_threads,
915 api_client=self._api_client,
916 num_retries=self.num_retries)
918 def _cached_manifest_valid(self):
920 Validate the oldest non-expired block signature to check if cached manifest
921 is usable: checking if the cached manifest was not created with a different
924 if self._state.get('manifest', None) is None:
925 # No cached manifest yet, all good.
927 now = datetime.datetime.utcnow()
931 for m in keep_locator_pattern.finditer(self._state['manifest']):
934 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
936 # Locator without signature
939 if exp > now and (oldest_exp is None or exp < oldest_exp):
943 # No block signatures found => no invalid block signatures.
945 if oldest_loc is None:
946 # Locator signatures found, but all have expired.
947 # Reset the cache and move on.
948 self.logger.info('Cache expired, starting from scratch.')
949 self._state['manifest'] = ''
951 kc = arvados.KeepClient(api_client=self._api_client,
952 num_retries=self.num_retries)
955 except arvados.errors.KeepRequestError:
956 # Something is wrong, cached manifest is not valid.
960 def collection_file_paths(self, col, path_prefix='.'):
961 """Return a list of file paths by recursively go through the entire collection `col`"""
963 for name, item in listitems(col):
964 if isinstance(item, arvados.arvfile.ArvadosFile):
965 file_paths.append(os.path.join(path_prefix, name))
966 elif isinstance(item, arvados.collection.Subcollection):
967 new_prefix = os.path.join(path_prefix, name)
968 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
971 def _lock_file(self, fileobj):
973 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
975 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
977 def _save_state(self):
979 Atomically save current state into cache.
981 with self._state_lock:
982 # We're not using copy.deepcopy() here because it's a lot slower
983 # than json.dumps(), and we're already needing JSON format to be
985 state = json.dumps(self._state)
987 new_cache = tempfile.NamedTemporaryFile(
989 dir=os.path.dirname(self._cache_filename), delete=False)
990 self._lock_file(new_cache)
991 new_cache.write(state)
994 os.rename(new_cache.name, self._cache_filename)
995 except (IOError, OSError, ResumeCacheConflict) as error:
996 self.logger.error("There was a problem while saving the cache file: {}".format(error))
998 os.unlink(new_cache_name)
999 except NameError: # mkstemp failed.
1002 self._cache_file.close()
1003 self._cache_file = new_cache
1005 def collection_name(self):
1006 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1008 def collection_trash_at(self):
1009 return self._my_collection().get_trash_at()
1011 def manifest_locator(self):
1012 return self._my_collection().manifest_locator()
1014 def portable_data_hash(self):
1015 pdh = self._my_collection().portable_data_hash()
1016 m = self._my_collection().stripped_manifest().encode()
1017 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1018 if pdh != local_pdh:
1019 self.logger.warning("\n".join([
1020 "arv-put: API server provided PDH differs from local manifest.",
1021 " This should not happen; showing API server version."]))
1024 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1025 return self._my_collection().manifest_text(stream_name, strip, normalize)
1027 def _datablocks_on_item(self, item):
1029 Return a list of datablock locators, recursively navigating
1030 through subcollections
1032 if isinstance(item, arvados.arvfile.ArvadosFile):
1033 if item.size() == 0:
1034 # Empty file locator
1035 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1038 for segment in item.segments():
1039 loc = segment.locator
1040 locators.append(loc)
1042 elif isinstance(item, arvados.collection.Collection):
1043 l = [self._datablocks_on_item(x) for x in listvalues(item)]
1044 # Fast list flattener method taken from:
1045 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1046 return [loc for sublist in l for loc in sublist]
1050 def data_locators(self):
1051 with self._collection_lock:
1052 # Make sure all datablocks are flushed before getting the locators
1053 self._my_collection().manifest_text()
1054 datablocks = self._datablocks_on_item(self._my_collection())
1057 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1060 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1061 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1062 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1063 # so instead we're using it on every path component.
1064 def pathname_match(pathname, pattern):
1065 name = pathname.split(os.sep)
1066 # Fix patterns like 'some/subdir/' or 'some//subdir'
1067 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1068 if len(name) != len(pat):
1070 for i in range(len(name)):
1071 if not fnmatch.fnmatch(name[i], pat[i]):
1075 def machine_progress(bytes_written, bytes_expected):
1076 return _machine_format.format(
1077 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1079 def human_progress(bytes_written, bytes_expected):
1081 return "\r{}M / {}M {:.1%} ".format(
1082 bytes_written >> 20, bytes_expected >> 20,
1083 float(bytes_written) / bytes_expected)
1085 return "\r{} ".format(bytes_written)
1087 def progress_writer(progress_func, outfile=sys.stderr):
1088 def write_progress(bytes_written, bytes_expected):
1089 outfile.write(progress_func(bytes_written, bytes_expected))
1090 return write_progress
1092 def desired_project_uuid(api_client, project_uuid, num_retries):
1093 if not project_uuid:
1094 query = api_client.users().current()
1095 elif arvados.util.user_uuid_pattern.match(project_uuid):
1096 query = api_client.users().get(uuid=project_uuid)
1097 elif arvados.util.group_uuid_pattern.match(project_uuid):
1098 query = api_client.groups().get(uuid=project_uuid)
1100 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1101 return query.execute(num_retries=num_retries)['uuid']
1103 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1104 install_sig_handlers=True):
1107 args = parse_arguments(arguments)
1108 logger = logging.getLogger('arvados.arv_put')
1110 logger.setLevel(logging.WARNING)
1112 logger.setLevel(logging.INFO)
1115 request_id = arvados.util.new_request_id()
1117 formatter = ArvPutLogFormatter(request_id)
1118 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1120 if api_client is None:
1121 api_client = arvados.api('v1', request_id=request_id)
1123 if install_sig_handlers:
1124 arv_cmd.install_signal_handlers()
1126 # Trash arguments validation
1128 if args.trash_at is not None:
1129 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1130 # make sure the user provides a complete YYYY-MM-DD date.
1131 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1132 logger.error("--trash-at argument format invalid, use --help to see examples.")
1134 # Check if no time information was provided. In that case, assume end-of-day.
1135 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1136 args.trash_at += 'T23:59:59'
1138 trash_at = ciso8601.parse_datetime(args.trash_at)
1140 logger.error("--trash-at argument format invalid, use --help to see examples.")
1143 if trash_at.tzinfo is not None:
1144 # Timezone aware datetime provided.
1145 utcoffset = -trash_at.utcoffset()
1147 # Timezone naive datetime provided. Assume is local.
1149 utcoffset = datetime.timedelta(seconds=time.altzone)
1151 utcoffset = datetime.timedelta(seconds=time.timezone)
1152 # Convert to UTC timezone naive datetime.
1153 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1155 if trash_at <= datetime.datetime.utcnow():
1156 logger.error("--trash-at argument must be set in the future")
1158 if args.trash_after is not None:
1159 if args.trash_after < 1:
1160 logger.error("--trash-after argument must be >= 1")
1162 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1164 # Determine the name to use
1166 if args.stream or args.raw:
1167 logger.error("Cannot use --name with --stream or --raw")
1169 elif args.update_collection:
1170 logger.error("Cannot use --name with --update-collection")
1172 collection_name = args.name
1174 collection_name = "Saved at {} by {}@{}".format(
1175 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1176 pwd.getpwuid(os.getuid()).pw_name,
1177 socket.gethostname())
1179 if args.project_uuid and (args.stream or args.raw):
1180 logger.error("Cannot use --project-uuid with --stream or --raw")
1183 # Determine the parent project
1185 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1187 except (apiclient_errors.Error, ValueError) as error:
1192 reporter = progress_writer(human_progress)
1193 elif args.batch_progress:
1194 reporter = progress_writer(machine_progress)
1198 # Split storage-classes argument
1199 storage_classes = None
1200 if args.storage_classes:
1201 storage_classes = args.storage_classes.strip().split(',')
1202 if len(storage_classes) > 1:
1203 logger.error("Multiple storage classes are not supported currently.")
1207 # Setup exclude regex from all the --exclude arguments provided
1210 exclude_names = None
1211 if len(args.exclude) > 0:
1212 # We're supporting 2 kinds of exclusion patterns:
1213 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1214 # the name, wherever the file is on the tree)
1215 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1216 # entire path, and should be relative to
1217 # any input dir argument)
1218 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1219 # placed directly underneath the input dir)
1220 for p in args.exclude:
1221 # Only relative paths patterns allowed
1222 if p.startswith(os.sep):
1223 logger.error("Cannot use absolute paths with --exclude")
1225 if os.path.dirname(p):
1226 # We don't support of path patterns with '..'
1227 p_parts = p.split(os.sep)
1230 "Cannot use path patterns that include or '..'")
1232 # Path search pattern
1233 exclude_paths.append(p)
1235 # Name-only search pattern
1236 name_patterns.append(p)
1237 # For name only matching, we can combine all patterns into a single
1238 # regexp, for better performance.
1239 exclude_names = re.compile('|'.join(
1240 [fnmatch.translate(p) for p in name_patterns]
1241 )) if len(name_patterns) > 0 else None
1242 # Show the user the patterns to be used, just in case they weren't
1243 # specified inside quotes and got changed by the shell expansion.
1244 logger.info("Exclude patterns: {}".format(args.exclude))
1246 # If this is used by a human, and there's at least one directory to be
1247 # uploaded, the expected bytes calculation can take a moment.
1248 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1249 logger.info("Calculating upload size, this could take some time...")
1251 writer = ArvPutUploadJob(paths = args.paths,
1252 resume = args.resume,
1253 use_cache = args.use_cache,
1254 filename = args.filename,
1255 reporter = reporter,
1256 api_client = api_client,
1257 num_retries = args.retries,
1258 replication_desired = args.replication,
1259 put_threads = args.threads,
1260 name = collection_name,
1261 owner_uuid = project_uuid,
1262 ensure_unique_name = True,
1263 update_collection = args.update_collection,
1264 storage_classes=storage_classes,
1266 dry_run=args.dry_run,
1267 follow_links=args.follow_links,
1268 exclude_paths=exclude_paths,
1269 exclude_names=exclude_names,
1271 except ResumeCacheConflict:
1272 logger.error("\n".join([
1273 "arv-put: Another process is already uploading this data.",
1274 " Use --no-cache if this is really what you want."]))
1276 except ResumeCacheInvalidError:
1277 logger.error("\n".join([
1278 "arv-put: Resume cache contains invalid signature: it may have expired",
1279 " or been created with another Arvados user's credentials.",
1280 " Switch user or use one of the following options to restart upload:",
1281 " --no-resume to start a new resume cache.",
1282 " --no-cache to disable resume cache."]))
1284 except (CollectionUpdateError, PathDoesNotExistError) as error:
1285 logger.error("\n".join([
1286 "arv-put: %s" % str(error)]))
1288 except ArvPutUploadIsPending:
1289 # Dry run check successful, return proper exit code.
1291 except ArvPutUploadNotPending:
1292 # No files pending for upload
1295 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1296 logger.warning("\n".join([
1297 "arv-put: Resuming previous upload from last checkpoint.",
1298 " Use the --no-resume option to start over."]))
1300 if not args.dry_run:
1301 writer.report_progress()
1304 writer.start(save_collection=not(args.stream or args.raw))
1305 except arvados.errors.ApiError as error:
1306 logger.error("\n".join([
1307 "arv-put: %s" % str(error)]))
1310 if args.progress: # Print newline to split stderr from stdout for humans.
1315 output = writer.manifest_text(normalize=True)
1317 output = writer.manifest_text()
1319 output = ','.join(writer.data_locators())
1322 expiration_notice = ""
1323 if writer.collection_trash_at() is not None:
1324 # Get the local timezone-naive version, and log it with timezone information.
1326 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1328 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1329 expiration_notice = ". It will expire on {} {}.".format(
1330 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1331 if args.update_collection:
1332 logger.info(u"Collection updated: '{}'{}".format(
1333 writer.collection_name(), expiration_notice))
1335 logger.info(u"Collection saved as '{}'{}".format(
1336 writer.collection_name(), expiration_notice))
1337 if args.portable_data_hash:
1338 output = writer.portable_data_hash()
1340 output = writer.manifest_locator()
1341 except apiclient_errors.Error as error:
1343 "arv-put: Error creating Collection on project: {}.".format(
1347 # Print the locator (uuid) of the new collection.
1349 status = status or 1
1350 elif not args.silent:
1351 stdout.write(output)
1352 if not output.endswith('\n'):
1355 if install_sig_handlers:
1356 arv_cmd.restore_signal_handlers()
1365 if __name__ == '__main__':