1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
37 import arvados.commands._util as arv_cmd
41 upload_opts = argparse.ArgumentParser(add_help=False)
43 upload_opts.add_argument('--version', action='version',
44 version="%s %s" % (sys.argv[0], __version__),
45 help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
53 _group = upload_opts.add_mutually_exclusive_group()
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56 default=-1, help=argparse.SUPPRESS)
58 _group.add_argument('--normalize', action='store_true',
60 Normalize the manifest by re-ordering files and streams after writing
64 _group.add_argument('--dry-run', action='store_true', default=False,
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
70 _group = upload_opts.add_mutually_exclusive_group()
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
77 _group.add_argument('--stream', action='store_true',
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
153 On high latency installations, using a greater number will improve
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158 action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171 dest='follow_links', help="""
172 Follow file and directory symlinks (default).
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176 Ignore file and directory symlinks. Even paths given explicitly on the
177 command line will be skipped if they are symlinks.
181 run_opts = argparse.ArgumentParser(add_help=False)
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
200 _group.add_argument('--no-progress', action='store_true',
202 Do not display human-readable progress on stderr, even if stderr is a
206 _group.add_argument('--batch-progress', action='store_true',
208 Display machine-readable progress on stderr (bytes and, if known,
212 run_opts.add_argument('--silent', action='store_true',
214 Do not print any debug messages to console. (Any error messages will
218 run_opts.add_argument('--batch', action='store_true', default=False,
220 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
224 _group = run_opts.add_mutually_exclusive_group()
225 _group.add_argument('--resume', action='store_true', default=True,
227 Continue interrupted uploads from cached state (default).
229 _group.add_argument('--no-resume', action='store_false', dest='resume',
231 Do not continue interrupted uploads from cached state.
234 _group = run_opts.add_mutually_exclusive_group()
235 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
237 Save upload state in a cache file for resuming (default).
239 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
241 Do not save upload state in a cache file for resuming.
244 _group = upload_opts.add_mutually_exclusive_group()
245 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
247 Set the trash date of the resulting collection to an absolute date in the future.
248 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
249 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
251 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
253 Set the trash date of the resulting collection to an amount of days from the
254 date/time that the upload process finishes.
257 arg_parser = argparse.ArgumentParser(
258 description='Copy data from the local filesystem to Keep.',
259 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
261 def parse_arguments(arguments):
262 args = arg_parser.parse_args(arguments)
264 if len(args.paths) == 0:
267 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
269 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
271 --filename argument cannot be used when storing a directory or
275 # Turn on --progress by default if stderr is a tty.
276 if (not (args.batch_progress or args.no_progress or args.silent)
277 and os.isatty(sys.stderr.fileno())):
280 # Turn off --resume (default) if --no-cache is used.
281 if not args.use_cache:
284 if args.paths == ['-']:
285 if args.update_collection:
287 --update-collection cannot be used when reading from stdin.
290 args.use_cache = False
291 if not args.filename:
292 args.filename = 'stdin'
294 # Remove possible duplicated patterns
295 if len(args.exclude) > 0:
296 args.exclude = list(set(args.exclude))
301 class PathDoesNotExistError(Exception):
305 class CollectionUpdateError(Exception):
309 class ResumeCacheConflict(Exception):
313 class ResumeCacheInvalidError(Exception):
316 class ArvPutArgumentConflict(Exception):
320 class ArvPutUploadIsPending(Exception):
324 class ArvPutUploadNotPending(Exception):
328 class FileUploadList(list):
329 def __init__(self, dry_run=False):
331 self.dry_run = dry_run
333 def append(self, other):
335 raise ArvPutUploadIsPending()
336 super(FileUploadList, self).append(other)
339 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
340 class ArvPutLogFormatter(logging.Formatter):
341 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
343 request_id_informed = False
345 def __init__(self, request_id):
346 self.err_fmtr = logging.Formatter(
347 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
348 arvados.log_date_format)
350 def format(self, record):
351 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
352 self.request_id_informed = True
353 return self.err_fmtr.format(record)
354 return self.std_fmtr.format(record)
357 class ResumeCache(object):
358 CACHE_DIR = '.cache/arvados/arv-put'
360 def __init__(self, file_spec):
361 self.cache_file = open(file_spec, 'a+')
362 self._lock_file(self.cache_file)
363 self.filename = self.cache_file.name
366 def make_path(cls, args):
368 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
369 realpaths = sorted(os.path.realpath(path) for path in args.paths)
370 md5.update(b'\0'.join([p.encode() for p in realpaths]))
371 if any(os.path.isdir(path) for path in realpaths):
374 md5.update(args.filename.encode())
376 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
379 def _lock_file(self, fileobj):
381 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
383 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
386 self.cache_file.seek(0)
387 return json.load(self.cache_file)
389 def check_cache(self, api_client=None, num_retries=0):
394 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
395 locator = state["_finished_streams"][0][1][0]
396 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
397 locator = state["_current_stream_locators"][0]
398 if locator is not None:
399 kc = arvados.keep.KeepClient(api_client=api_client)
400 kc.head(locator, num_retries=num_retries)
401 except Exception as e:
406 def save(self, data):
408 new_cache_fd, new_cache_name = tempfile.mkstemp(
409 dir=os.path.dirname(self.filename))
410 self._lock_file(new_cache_fd)
411 new_cache = os.fdopen(new_cache_fd, 'r+')
412 json.dump(data, new_cache)
413 os.rename(new_cache_name, self.filename)
414 except (IOError, OSError, ResumeCacheConflict):
416 os.unlink(new_cache_name)
417 except NameError: # mkstemp failed.
420 self.cache_file.close()
421 self.cache_file = new_cache
424 self.cache_file.close()
428 os.unlink(self.filename)
429 except OSError as error:
430 if error.errno != errno.ENOENT: # That's what we wanted anyway.
436 self.__init__(self.filename)
439 class ArvPutUploadJob(object):
440 CACHE_DIR = '.cache/arvados/arv-put'
442 'manifest' : None, # Last saved manifest checkpoint
443 'files' : {} # Previous run file list: {path : {size, mtime}}
446 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
447 name=None, owner_uuid=None, api_client=None, batch_mode=False,
448 ensure_unique_name=False, num_retries=None,
449 put_threads=None, replication_desired=None, filename=None,
450 update_time=60.0, update_collection=None, storage_classes=None,
451 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
452 follow_links=True, exclude_paths=[], exclude_names=None,
456 self.use_cache = use_cache
457 self.batch_mode = batch_mode
459 self.reporter = reporter
460 # This will set to 0 before start counting, if no special files are going
462 self.bytes_expected = None
463 self.bytes_written = 0
464 self.bytes_skipped = 0
466 self.owner_uuid = owner_uuid
467 self.ensure_unique_name = ensure_unique_name
468 self.num_retries = num_retries
469 self.replication_desired = replication_desired
470 self.put_threads = put_threads
471 self.filename = filename
472 self.storage_classes = storage_classes
473 self._api_client = api_client
474 self._state_lock = threading.Lock()
475 self._state = None # Previous run state (file list & manifest)
476 self._current_files = [] # Current run file list
477 self._cache_file = None
478 self._collection_lock = threading.Lock()
479 self._remote_collection = None # Collection being updated (if asked)
480 self._local_collection = None # Collection from previous run manifest
481 self._file_paths = set() # Files to be updated in remote collection
482 self._stop_checkpointer = threading.Event()
483 self._checkpointer = threading.Thread(target=self._update_task)
484 self._checkpointer.daemon = True
485 self._update_task_time = update_time # How many seconds wait between update runs
486 self._files_to_upload = FileUploadList(dry_run=dry_run)
487 self._upload_started = False
489 self.dry_run = dry_run
490 self._checkpoint_before_quit = True
491 self.follow_links = follow_links
492 self.exclude_paths = exclude_paths
493 self.exclude_names = exclude_names
494 self._trash_at = trash_at
496 if self._trash_at is not None:
497 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
498 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
499 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
500 raise TypeError('provided trash_at datetime should be timezone-naive')
502 if not self.use_cache and self.resume:
503 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
505 # Check for obvious dry-run responses
506 if self.dry_run and (not self.use_cache or not self.resume):
507 raise ArvPutUploadIsPending()
509 # Load cached data if any and if needed
510 self._setup_state(update_collection)
512 # Build the upload file list, excluding requested files and counting the
513 # bytes expected to be uploaded.
514 self._build_upload_list()
516 def _build_upload_list(self):
518 Scan the requested paths to count file sizes, excluding requested files
519 and dirs and building the upload file list.
521 # If there aren't special files to be read, reset total bytes count to zero
523 if not any([p for p in self.paths
524 if not (os.path.isfile(p) or os.path.isdir(p))]):
525 self.bytes_expected = 0
527 for path in self.paths:
528 # Test for stdin first, in case some file named '-' exist
531 raise ArvPutUploadIsPending()
532 self._write_stdin(self.filename or 'stdin')
533 elif not os.path.exists(path):
534 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
535 elif (not self.follow_links) and os.path.islink(path):
536 self.logger.warning("Skipping symlink '{}'".format(path))
538 elif os.path.isdir(path):
539 # Use absolute paths on cache index so CWD doesn't interfere
540 # with the caching logic.
542 path = os.path.abspath(path)
543 if orig_path[-1:] == os.sep:
544 # When passing a directory reference with a trailing slash,
545 # its contents should be uploaded directly to the
549 # When passing a directory reference with no trailing slash,
550 # upload the directory to the collection's root.
551 prefixdir = os.path.dirname(path)
553 for root, dirs, files in os.walk(path,
554 followlinks=self.follow_links):
555 root_relpath = os.path.relpath(root, path)
556 if root_relpath == '.':
558 # Exclude files/dirs by full path matching pattern
559 if self.exclude_paths:
560 dirs[:] = [d for d in dirs
561 if not any(pathname_match(
562 os.path.join(root_relpath, d), pat)
563 for pat in self.exclude_paths)]
564 files = [f for f in files
565 if not any(pathname_match(
566 os.path.join(root_relpath, f), pat)
567 for pat in self.exclude_paths)]
568 # Exclude files/dirs by name matching pattern
569 if self.exclude_names is not None:
570 dirs[:] = [d for d in dirs
571 if not self.exclude_names.match(d)]
572 files = [f for f in files
573 if not self.exclude_names.match(f)]
574 # Make os.walk()'s dir traversing order deterministic
578 filepath = os.path.join(root, f)
579 if not os.path.isfile(filepath):
580 self.logger.warning("Skipping non-regular file '{}'".format(filepath))
582 # Add its size to the total bytes count (if applicable)
583 if self.follow_links or (not os.path.islink(filepath)):
584 if self.bytes_expected is not None:
585 self.bytes_expected += os.path.getsize(filepath)
586 self._check_file(filepath,
587 os.path.join(root[len(prefixdir):], f))
589 filepath = os.path.abspath(path)
590 # Add its size to the total bytes count (if applicable)
591 if self.follow_links or (not os.path.islink(filepath)):
592 if self.bytes_expected is not None:
593 self.bytes_expected += os.path.getsize(filepath)
594 self._check_file(filepath,
595 self.filename or os.path.basename(path))
596 # If dry-mode is on, and got up to this point, then we should notify that
597 # there aren't any file to upload.
599 raise ArvPutUploadNotPending()
600 # Remove local_collection's files that don't exist locally anymore, so the
601 # bytes_written count is correct.
602 for f in self.collection_file_paths(self._local_collection,
604 if f != 'stdin' and f != self.filename and not f in self._file_paths:
605 self._local_collection.remove(f)
607 def start(self, save_collection):
609 Start supporting thread & file uploading
611 self._checkpointer.start()
613 # Update bytes_written from current local collection and
614 # report initial progress.
617 self._upload_started = True # Used by the update thread to start checkpointing
619 except (SystemExit, Exception) as e:
620 self._checkpoint_before_quit = False
621 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
622 # Note: We're expecting SystemExit instead of
623 # KeyboardInterrupt because we have a custom signal
624 # handler in place that raises SystemExit with the catched
626 if isinstance(e, PathDoesNotExistError):
627 # We aren't interested in the traceback for this case
629 elif not isinstance(e, SystemExit) or e.code != -2:
630 self.logger.warning("Abnormal termination:\n{}".format(
631 traceback.format_exc()))
635 # Stop the thread before doing anything else
636 self._stop_checkpointer.set()
637 self._checkpointer.join()
638 if self._checkpoint_before_quit:
639 # Commit all pending blocks & one last _update()
640 self._local_collection.manifest_text()
641 self._update(final=True)
643 self.save_collection()
645 self._cache_file.close()
647 def _collection_trash_at(self):
649 Returns the trash date that the collection should use at save time.
650 Takes into account absolute/relative trash_at values requested
653 if type(self._trash_at) == datetime.timedelta:
654 # Get an absolute datetime for trash_at
655 return datetime.datetime.utcnow() + self._trash_at
656 return self._trash_at
658 def save_collection(self):
660 # Check if files should be updated on the remote collection.
661 for fp in self._file_paths:
662 remote_file = self._remote_collection.find(fp)
664 # File don't exist on remote collection, copy it.
665 self._remote_collection.copy(fp, fp, self._local_collection)
666 elif remote_file != self._local_collection.find(fp):
667 # A different file exist on remote collection, overwrite it.
668 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
670 # The file already exist on remote collection, skip it.
672 self._remote_collection.save(num_retries=self.num_retries,
673 trash_at=self._collection_trash_at())
675 if len(self._local_collection) == 0:
676 self.logger.warning("No files were uploaded, skipping collection creation.")
678 self._local_collection.save_new(
679 name=self.name, owner_uuid=self.owner_uuid,
680 ensure_unique_name=self.ensure_unique_name,
681 num_retries=self.num_retries,
682 trash_at=self._collection_trash_at())
684 def destroy_cache(self):
687 os.unlink(self._cache_filename)
688 except OSError as error:
689 # That's what we wanted anyway.
690 if error.errno != errno.ENOENT:
692 self._cache_file.close()
694 def _collection_size(self, collection):
696 Recursively get the total size of the collection
699 for item in listvalues(collection):
700 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
701 size += self._collection_size(item)
706 def _update_task(self):
708 Periodically called support task. File uploading is
709 asynchronous so we poll status from the collection.
711 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
714 def _update(self, final=False):
716 Update cached manifest text and report progress.
718 if self._upload_started:
719 with self._collection_lock:
720 self.bytes_written = self._collection_size(self._local_collection)
723 manifest = self._local_collection.manifest_text()
725 # Get the manifest text without comitting pending blocks
726 manifest = self._local_collection.manifest_text(strip=False,
730 with self._state_lock:
731 self._state['manifest'] = manifest
735 except Exception as e:
736 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
737 # Keep remote collection's trash_at attribute synced when using relative expire dates
738 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
740 self._api_client.collections().update(
741 uuid=self._remote_collection.manifest_locator(),
742 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
743 ).execute(num_retries=self.num_retries)
744 except Exception as e:
745 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
747 self.bytes_written = self.bytes_skipped
748 # Call the reporter, if any
749 self.report_progress()
751 def report_progress(self):
752 if self.reporter is not None:
753 self.reporter(self.bytes_written, self.bytes_expected)
755 def _write_stdin(self, filename):
756 output = self._local_collection.open(filename, 'wb')
757 self._write(sys.stdin.buffer, output)
760 def _check_file(self, source, filename):
762 Check if this file needs to be uploaded
764 # Ignore symlinks when requested
765 if (not self.follow_links) and os.path.islink(source):
768 should_upload = False
769 new_file_in_cache = False
770 # Record file path for updating the remote collection before exiting
771 self._file_paths.add(filename)
773 with self._state_lock:
774 # If no previous cached data on this file, store it for an eventual
776 if source not in self._state['files']:
777 self._state['files'][source] = {
778 'mtime': os.path.getmtime(source),
779 'size' : os.path.getsize(source)
781 new_file_in_cache = True
782 cached_file_data = self._state['files'][source]
784 # Check if file was already uploaded (at least partially)
785 file_in_local_collection = self._local_collection.find(filename)
787 # If not resuming, upload the full file.
790 # New file detected from last run, upload it.
791 elif new_file_in_cache:
793 # Local file didn't change from last run.
794 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
795 if not file_in_local_collection:
796 # File not uploaded yet, upload it completely
798 elif file_in_local_collection.permission_expired():
799 # Permission token expired, re-upload file. This will change whenever
800 # we have a API for refreshing tokens.
801 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
803 self._local_collection.remove(filename)
804 elif cached_file_data['size'] == file_in_local_collection.size():
805 # File already there, skip it.
806 self.bytes_skipped += cached_file_data['size']
807 elif cached_file_data['size'] > file_in_local_collection.size():
808 # File partially uploaded, resume!
809 resume_offset = file_in_local_collection.size()
810 self.bytes_skipped += resume_offset
813 # Inconsistent cache, re-upload the file
815 self._local_collection.remove(filename)
816 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
817 # Local file differs from cached data, re-upload it.
819 if file_in_local_collection:
820 self._local_collection.remove(filename)
825 self._files_to_upload.append((source, resume_offset, filename))
826 except ArvPutUploadIsPending:
827 # This could happen when running on dry-mode, close cache file to
828 # avoid locking issues.
829 self._cache_file.close()
832 def _upload_files(self):
833 for source, resume_offset, filename in self._files_to_upload:
834 with open(source, 'rb') as source_fd:
835 with self._state_lock:
836 self._state['files'][source]['mtime'] = os.path.getmtime(source)
837 self._state['files'][source]['size'] = os.path.getsize(source)
838 if resume_offset > 0:
839 # Start upload where we left off
840 output = self._local_collection.open(filename, 'ab')
841 source_fd.seek(resume_offset)
844 output = self._local_collection.open(filename, 'wb')
845 self._write(source_fd, output)
846 output.close(flush=False)
848 def _write(self, source_fd, output):
850 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
855 def _my_collection(self):
856 return self._remote_collection if self.update else self._local_collection
858 def _get_cache_filepath(self):
859 # Set up cache file name from input paths.
861 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
862 realpaths = sorted(os.path.realpath(path) for path in self.paths)
863 md5.update(b'\0'.join([p.encode() for p in realpaths]))
865 md5.update(self.filename.encode())
866 cache_filename = md5.hexdigest()
867 cache_filepath = os.path.join(
868 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
870 return cache_filepath
872 def _setup_state(self, update_collection):
874 Create a new cache file or load a previously existing one.
876 # Load an already existing collection for update
877 if update_collection and re.match(arvados.util.collection_uuid_pattern,
880 self._remote_collection = arvados.collection.Collection(
882 api_client=self._api_client,
883 storage_classes_desired=self.storage_classes,
884 num_retries=self.num_retries)
885 except arvados.errors.ApiError as error:
886 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
889 elif update_collection:
890 # Collection locator provided, but unknown format
891 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
894 cache_filepath = self._get_cache_filepath()
895 if self.resume and os.path.exists(cache_filepath):
896 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
897 self._cache_file = open(cache_filepath, 'a+')
899 # --no-resume means start with a empty cache file.
900 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
901 self._cache_file = open(cache_filepath, 'w+')
902 self._cache_filename = self._cache_file.name
903 self._lock_file(self._cache_file)
904 self._cache_file.seek(0)
906 with self._state_lock:
909 self._state = json.load(self._cache_file)
910 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
911 # Cache at least partially incomplete, set up new cache
912 self._state = copy.deepcopy(self.EMPTY_STATE)
914 # Cache file empty, set up new cache
915 self._state = copy.deepcopy(self.EMPTY_STATE)
917 self.logger.info("No cache usage requested for this run.")
918 # No cache file, set empty state
919 self._state = copy.deepcopy(self.EMPTY_STATE)
920 if not self._cached_manifest_valid():
921 if not self.batch_mode:
922 raise ResumeCacheInvalidError()
924 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
925 self.use_cache = False # Don't overwrite preexisting cache file.
926 self._state = copy.deepcopy(self.EMPTY_STATE)
927 # Load the previous manifest so we can check if files were modified remotely.
928 self._local_collection = arvados.collection.Collection(
929 self._state['manifest'],
930 replication_desired=self.replication_desired,
931 storage_classes_desired=self.storage_classes,
932 put_threads=self.put_threads,
933 api_client=self._api_client,
934 num_retries=self.num_retries)
936 def _cached_manifest_valid(self):
938 Validate the oldest non-expired block signature to check if cached manifest
939 is usable: checking if the cached manifest was not created with a different
942 if self._state.get('manifest', None) is None:
943 # No cached manifest yet, all good.
945 now = datetime.datetime.utcnow()
949 for m in keep_locator_pattern.finditer(self._state['manifest']):
952 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
954 # Locator without signature
957 if exp > now and (oldest_exp is None or exp < oldest_exp):
961 # No block signatures found => no invalid block signatures.
963 if oldest_loc is None:
964 # Locator signatures found, but all have expired.
965 # Reset the cache and move on.
966 self.logger.info('Cache expired, starting from scratch.')
967 self._state['manifest'] = ''
969 kc = arvados.KeepClient(api_client=self._api_client,
970 num_retries=self.num_retries)
973 except arvados.errors.KeepRequestError:
974 # Something is wrong, cached manifest is not valid.
978 def collection_file_paths(self, col, path_prefix='.'):
979 """Return a list of file paths by recursively go through the entire collection `col`"""
981 for name, item in listitems(col):
982 if isinstance(item, arvados.arvfile.ArvadosFile):
983 file_paths.append(os.path.join(path_prefix, name))
984 elif isinstance(item, arvados.collection.Subcollection):
985 new_prefix = os.path.join(path_prefix, name)
986 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
989 def _lock_file(self, fileobj):
991 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
993 raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
995 def _save_state(self):
997 Atomically save current state into cache.
999 with self._state_lock:
1000 # We're not using copy.deepcopy() here because it's a lot slower
1001 # than json.dumps(), and we're already needing JSON format to be
1003 state = json.dumps(self._state)
1005 new_cache = tempfile.NamedTemporaryFile(
1007 dir=os.path.dirname(self._cache_filename), delete=False)
1008 self._lock_file(new_cache)
1009 new_cache.write(state)
1012 os.rename(new_cache.name, self._cache_filename)
1013 except (IOError, OSError, ResumeCacheConflict) as error:
1014 self.logger.error("There was a problem while saving the cache file: {}".format(error))
1016 os.unlink(new_cache_name)
1017 except NameError: # mkstemp failed.
1020 self._cache_file.close()
1021 self._cache_file = new_cache
1023 def collection_name(self):
1024 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1026 def collection_trash_at(self):
1027 return self._my_collection().get_trash_at()
1029 def manifest_locator(self):
1030 return self._my_collection().manifest_locator()
1032 def portable_data_hash(self):
1033 pdh = self._my_collection().portable_data_hash()
1034 m = self._my_collection().stripped_manifest().encode()
1035 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1036 if pdh != local_pdh:
1037 self.logger.warning("\n".join([
1038 "arv-put: API server provided PDH differs from local manifest.",
1039 " This should not happen; showing API server version."]))
1042 def manifest_text(self, stream_name=".", strip=False, normalize=False):
1043 return self._my_collection().manifest_text(stream_name, strip, normalize)
1045 def _datablocks_on_item(self, item):
1047 Return a list of datablock locators, recursively navigating
1048 through subcollections
1050 if isinstance(item, arvados.arvfile.ArvadosFile):
1051 if item.size() == 0:
1052 # Empty file locator
1053 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1056 for segment in item.segments():
1057 loc = segment.locator
1058 locators.append(loc)
1060 elif isinstance(item, arvados.collection.Collection):
1061 l = [self._datablocks_on_item(x) for x in listvalues(item)]
1062 # Fast list flattener method taken from:
1063 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1064 return [loc for sublist in l for loc in sublist]
1068 def data_locators(self):
1069 with self._collection_lock:
1070 # Make sure all datablocks are flushed before getting the locators
1071 self._my_collection().manifest_text()
1072 datablocks = self._datablocks_on_item(self._my_collection())
1075 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1078 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1079 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1080 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1081 # so instead we're using it on every path component.
1082 def pathname_match(pathname, pattern):
1083 name = pathname.split(os.sep)
1084 # Fix patterns like 'some/subdir/' or 'some//subdir'
1085 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1086 if len(name) != len(pat):
1088 for i in range(len(name)):
1089 if not fnmatch.fnmatch(name[i], pat[i]):
1093 def machine_progress(bytes_written, bytes_expected):
1094 return _machine_format.format(
1095 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1097 def human_progress(bytes_written, bytes_expected):
1099 return "\r{}M / {}M {:.1%} ".format(
1100 bytes_written >> 20, bytes_expected >> 20,
1101 float(bytes_written) / bytes_expected)
1103 return "\r{} ".format(bytes_written)
1105 def progress_writer(progress_func, outfile=sys.stderr):
1106 def write_progress(bytes_written, bytes_expected):
1107 outfile.write(progress_func(bytes_written, bytes_expected))
1108 return write_progress
1110 def desired_project_uuid(api_client, project_uuid, num_retries):
1111 if not project_uuid:
1112 query = api_client.users().current()
1113 elif arvados.util.user_uuid_pattern.match(project_uuid):
1114 query = api_client.users().get(uuid=project_uuid)
1115 elif arvados.util.group_uuid_pattern.match(project_uuid):
1116 query = api_client.groups().get(uuid=project_uuid)
1118 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1119 return query.execute(num_retries=num_retries)['uuid']
1121 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1122 install_sig_handlers=True):
1125 args = parse_arguments(arguments)
1126 logger = logging.getLogger('arvados.arv_put')
1128 logger.setLevel(logging.WARNING)
1130 logger.setLevel(logging.INFO)
1133 request_id = arvados.util.new_request_id()
1135 formatter = ArvPutLogFormatter(request_id)
1136 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1138 if api_client is None:
1139 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1141 if install_sig_handlers:
1142 arv_cmd.install_signal_handlers()
1144 # Trash arguments validation
1146 if args.trash_at is not None:
1147 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1148 # make sure the user provides a complete YYYY-MM-DD date.
1149 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1150 logger.error("--trash-at argument format invalid, use --help to see examples.")
1152 # Check if no time information was provided. In that case, assume end-of-day.
1153 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1154 args.trash_at += 'T23:59:59'
1156 trash_at = ciso8601.parse_datetime(args.trash_at)
1158 logger.error("--trash-at argument format invalid, use --help to see examples.")
1161 if trash_at.tzinfo is not None:
1162 # Timezone aware datetime provided.
1163 utcoffset = -trash_at.utcoffset()
1165 # Timezone naive datetime provided. Assume is local.
1167 utcoffset = datetime.timedelta(seconds=time.altzone)
1169 utcoffset = datetime.timedelta(seconds=time.timezone)
1170 # Convert to UTC timezone naive datetime.
1171 trash_at = trash_at.replace(tzinfo=None) + utcoffset
1173 if trash_at <= datetime.datetime.utcnow():
1174 logger.error("--trash-at argument must be set in the future")
1176 if args.trash_after is not None:
1177 if args.trash_after < 1:
1178 logger.error("--trash-after argument must be >= 1")
1180 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1182 # Determine the name to use
1184 if args.stream or args.raw:
1185 logger.error("Cannot use --name with --stream or --raw")
1187 elif args.update_collection:
1188 logger.error("Cannot use --name with --update-collection")
1190 collection_name = args.name
1192 collection_name = "Saved at {} by {}@{}".format(
1193 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1194 pwd.getpwuid(os.getuid()).pw_name,
1195 socket.gethostname())
1197 if args.project_uuid and (args.stream or args.raw):
1198 logger.error("Cannot use --project-uuid with --stream or --raw")
1201 # Determine the parent project
1203 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1205 except (apiclient_errors.Error, ValueError) as error:
1210 reporter = progress_writer(human_progress)
1211 elif args.batch_progress:
1212 reporter = progress_writer(machine_progress)
1216 # Split storage-classes argument
1217 storage_classes = None
1218 if args.storage_classes:
1219 storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1221 # Setup exclude regex from all the --exclude arguments provided
1224 exclude_names = None
1225 if len(args.exclude) > 0:
1226 # We're supporting 2 kinds of exclusion patterns:
1227 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1228 # the name, wherever the file is on the tree)
1229 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1230 # entire path, and should be relative to
1231 # any input dir argument)
1232 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1233 # placed directly underneath the input dir)
1234 for p in args.exclude:
1235 # Only relative paths patterns allowed
1236 if p.startswith(os.sep):
1237 logger.error("Cannot use absolute paths with --exclude")
1239 if os.path.dirname(p):
1240 # We don't support of path patterns with '..'
1241 p_parts = p.split(os.sep)
1244 "Cannot use path patterns that include or '..'")
1246 # Path search pattern
1247 exclude_paths.append(p)
1249 # Name-only search pattern
1250 name_patterns.append(p)
1251 # For name only matching, we can combine all patterns into a single
1252 # regexp, for better performance.
1253 exclude_names = re.compile('|'.join(
1254 [fnmatch.translate(p) for p in name_patterns]
1255 )) if len(name_patterns) > 0 else None
1256 # Show the user the patterns to be used, just in case they weren't
1257 # specified inside quotes and got changed by the shell expansion.
1258 logger.info("Exclude patterns: {}".format(args.exclude))
1260 # If this is used by a human, and there's at least one directory to be
1261 # uploaded, the expected bytes calculation can take a moment.
1262 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1263 logger.info("Calculating upload size, this could take some time...")
1265 writer = ArvPutUploadJob(paths = args.paths,
1266 resume = args.resume,
1267 use_cache = args.use_cache,
1268 batch_mode= args.batch,
1269 filename = args.filename,
1270 reporter = reporter,
1271 api_client = api_client,
1272 num_retries = args.retries,
1273 replication_desired = args.replication,
1274 put_threads = args.threads,
1275 name = collection_name,
1276 owner_uuid = project_uuid,
1277 ensure_unique_name = True,
1278 update_collection = args.update_collection,
1279 storage_classes=storage_classes,
1281 dry_run=args.dry_run,
1282 follow_links=args.follow_links,
1283 exclude_paths=exclude_paths,
1284 exclude_names=exclude_names,
1286 except ResumeCacheConflict:
1287 logger.error("\n".join([
1288 "arv-put: Another process is already uploading this data.",
1289 " Use --no-cache if this is really what you want."]))
1291 except ResumeCacheInvalidError:
1292 logger.error("\n".join([
1293 "arv-put: Resume cache contains invalid signature: it may have expired",
1294 " or been created with another Arvados user's credentials.",
1295 " Switch user or use one of the following options to restart upload:",
1296 " --no-resume to start a new resume cache.",
1297 " --no-cache to disable resume cache.",
1298 " --batch to ignore the resume cache if invalid."]))
1300 except (CollectionUpdateError, PathDoesNotExistError) as error:
1301 logger.error("\n".join([
1302 "arv-put: %s" % str(error)]))
1304 except ArvPutUploadIsPending:
1305 # Dry run check successful, return proper exit code.
1307 except ArvPutUploadNotPending:
1308 # No files pending for upload
1311 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1312 logger.warning("\n".join([
1313 "arv-put: Resuming previous upload from last checkpoint.",
1314 " Use the --no-resume option to start over."]))
1316 if not args.dry_run:
1317 writer.report_progress()
1320 writer.start(save_collection=not(args.stream or args.raw))
1321 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1322 logger.error("\n".join([
1323 "arv-put: %s" % str(error)]))
1326 if args.progress: # Print newline to split stderr from stdout for humans.
1331 output = writer.manifest_text(normalize=True)
1333 output = writer.manifest_text()
1335 output = ','.join(writer.data_locators())
1336 elif writer.manifest_locator() is not None:
1338 expiration_notice = ""
1339 if writer.collection_trash_at() is not None:
1340 # Get the local timezone-naive version, and log it with timezone information.
1342 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1344 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1345 expiration_notice = ". It will expire on {} {}.".format(
1346 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1347 if args.update_collection:
1348 logger.info(u"Collection updated: '{}'{}".format(
1349 writer.collection_name(), expiration_notice))
1351 logger.info(u"Collection saved as '{}'{}".format(
1352 writer.collection_name(), expiration_notice))
1353 if args.portable_data_hash:
1354 output = writer.portable_data_hash()
1356 output = writer.manifest_locator()
1357 except apiclient_errors.Error as error:
1359 "arv-put: Error creating Collection on project: {}.".format(
1365 # Print the locator (uuid) of the new collection.
1367 status = status or 1
1368 elif not args.silent:
1369 stdout.write(output)
1370 if not output.endswith('\n'):
1373 if install_sig_handlers:
1374 arv_cmd.restore_signal_handlers()
1383 if __name__ == '__main__':