1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
39 upload_opts = argparse.ArgumentParser(add_help=False)
41 upload_opts.add_argument('--version', action='version',
42 version="%s %s" % (sys.argv[0], __version__),
43 help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
51 _group = upload_opts.add_mutually_exclusive_group()
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54 default=-1, help=argparse.SUPPRESS)
56 _group.add_argument('--normalize', action='store_true',
58 Normalize the manifest by re-ordering files and streams after writing
62 _group.add_argument('--dry-run', action='store_true', default=False,
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
68 _group = upload_opts.add_mutually_exclusive_group()
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
75 _group.add_argument('--stream', action='store_true',
77 Store the file content and display the resulting manifest on
78 stdout. Do not write the manifest to Keep or save a Collection object
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84 Synonym for --manifest.
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89 Synonym for --manifest.
92 _group.add_argument('--manifest', action='store_true',
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
104 _group.add_argument('--raw', action='store_true',
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112 dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118 dest='filename', help="""
119 Synonym for --filename.
122 upload_opts.add_argument('--filename', type=str, default=None,
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
143 upload_opts.add_argument('--storage-classes', help="""
144 Specify comma separated list of storage classes to be used when saving data to Keep.
147 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149 Set the number of upload threads to be used. Take into account that
150 using lots of threads will increase the RAM requirements. Default is
152 On high latency installations, using a greater number will improve
156 run_opts = argparse.ArgumentParser(add_help=False)
158 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
159 Store the collection in the specified project, instead of your Home
163 run_opts.add_argument('--name', help="""
164 Save the collection with the specified name.
167 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
168 action='append', help="""
169 Exclude files and directories whose names match the given glob pattern. When
170 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
171 directory, relative to the provided input dirs will be excluded.
172 When using a filename pattern like '*.txt', any text file will be excluded
173 no matter where is placed.
174 For the special case of needing to exclude only files or dirs directly below
175 the given input directory, you can use a pattern like './exclude_this.gif'.
176 You can specify multiple patterns by using this argument more than once.
179 _group = run_opts.add_mutually_exclusive_group()
180 _group.add_argument('--progress', action='store_true',
182 Display human-readable progress on stderr (bytes and, if possible,
183 percentage of total data size). This is the default behavior when
187 _group.add_argument('--no-progress', action='store_true',
189 Do not display human-readable progress on stderr, even if stderr is a
193 _group.add_argument('--batch-progress', action='store_true',
195 Display machine-readable progress on stderr (bytes and, if known,
199 run_opts.add_argument('--silent', action='store_true',
201 Do not print any debug messages to console. (Any error messages will
205 _group = run_opts.add_mutually_exclusive_group()
206 _group.add_argument('--resume', action='store_true', default=True,
208 Continue interrupted uploads from cached state (default).
210 _group.add_argument('--no-resume', action='store_false', dest='resume',
212 Do not continue interrupted uploads from cached state.
215 _group = run_opts.add_mutually_exclusive_group()
216 _group.add_argument('--follow-links', action='store_true', default=True,
217 dest='follow_links', help="""
218 Follow file and directory symlinks (default).
220 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
222 Do not follow file and directory symlinks.
225 _group = run_opts.add_mutually_exclusive_group()
226 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
228 Save upload state in a cache file for resuming (default).
230 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
232 Do not save upload state in a cache file for resuming.
235 arg_parser = argparse.ArgumentParser(
236 description='Copy data from the local filesystem to Keep.',
237 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
239 def parse_arguments(arguments):
240 args = arg_parser.parse_args(arguments)
242 if len(args.paths) == 0:
245 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
247 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
250 --filename argument cannot be used when storing a directory or
254 # Turn on --progress by default if stderr is a tty.
255 if (not (args.batch_progress or args.no_progress or args.silent)
256 and os.isatty(sys.stderr.fileno())):
259 # Turn off --resume (default) if --no-cache is used.
260 if not args.use_cache:
263 if args.paths == ['-']:
264 if args.update_collection:
266 --update-collection cannot be used when reading from stdin.
269 args.use_cache = False
270 if not args.filename:
271 args.filename = 'stdin'
273 # Remove possible duplicated patterns
274 if len(args.exclude) > 0:
275 args.exclude = list(set(args.exclude))
280 class PathDoesNotExistError(Exception):
284 class CollectionUpdateError(Exception):
288 class ResumeCacheConflict(Exception):
292 class ArvPutArgumentConflict(Exception):
296 class ArvPutUploadIsPending(Exception):
300 class ArvPutUploadNotPending(Exception):
304 class FileUploadList(list):
305 def __init__(self, dry_run=False):
307 self.dry_run = dry_run
309 def append(self, other):
311 raise ArvPutUploadIsPending()
312 super(FileUploadList, self).append(other)
315 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
316 class ArvPutLogFormatter(logging.Formatter):
317 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
319 request_id_informed = False
321 def __init__(self, request_id):
322 self.err_fmtr = logging.Formatter(
323 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
324 arvados.log_date_format)
326 def format(self, record):
327 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
328 self.request_id_informed = True
329 return self.err_fmtr.format(record)
330 return self.std_fmtr.format(record)
333 class ResumeCache(object):
334 CACHE_DIR = '.cache/arvados/arv-put'
336 def __init__(self, file_spec):
337 self.cache_file = open(file_spec, 'a+')
338 self._lock_file(self.cache_file)
339 self.filename = self.cache_file.name
342 def make_path(cls, args):
344 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
345 realpaths = sorted(os.path.realpath(path) for path in args.paths)
346 md5.update(b'\0'.join([p.encode() for p in realpaths]))
347 if any(os.path.isdir(path) for path in realpaths):
350 md5.update(args.filename.encode())
352 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
355 def _lock_file(self, fileobj):
357 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
359 raise ResumeCacheConflict("{} locked".format(fileobj.name))
362 self.cache_file.seek(0)
363 return json.load(self.cache_file)
365 def check_cache(self, api_client=None, num_retries=0):
370 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
371 locator = state["_finished_streams"][0][1][0]
372 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
373 locator = state["_current_stream_locators"][0]
374 if locator is not None:
375 kc = arvados.keep.KeepClient(api_client=api_client)
376 kc.head(locator, num_retries=num_retries)
377 except Exception as e:
382 def save(self, data):
384 new_cache_fd, new_cache_name = tempfile.mkstemp(
385 dir=os.path.dirname(self.filename))
386 self._lock_file(new_cache_fd)
387 new_cache = os.fdopen(new_cache_fd, 'r+')
388 json.dump(data, new_cache)
389 os.rename(new_cache_name, self.filename)
390 except (IOError, OSError, ResumeCacheConflict) as error:
392 os.unlink(new_cache_name)
393 except NameError: # mkstemp failed.
396 self.cache_file.close()
397 self.cache_file = new_cache
400 self.cache_file.close()
404 os.unlink(self.filename)
405 except OSError as error:
406 if error.errno != errno.ENOENT: # That's what we wanted anyway.
412 self.__init__(self.filename)
415 class ArvPutUploadJob(object):
416 CACHE_DIR = '.cache/arvados/arv-put'
418 'manifest' : None, # Last saved manifest checkpoint
419 'files' : {} # Previous run file list: {path : {size, mtime}}
422 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
423 name=None, owner_uuid=None, api_client=None,
424 ensure_unique_name=False, num_retries=None,
425 put_threads=None, replication_desired=None, filename=None,
426 update_time=60.0, update_collection=None, storage_classes=None,
427 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
428 follow_links=True, exclude_paths=[], exclude_names=None):
431 self.use_cache = use_cache
433 self.reporter = reporter
434 # This will set to 0 before start counting, if no special files are going
436 self.bytes_expected = None
437 self.bytes_written = 0
438 self.bytes_skipped = 0
440 self.owner_uuid = owner_uuid
441 self.ensure_unique_name = ensure_unique_name
442 self.num_retries = num_retries
443 self.replication_desired = replication_desired
444 self.put_threads = put_threads
445 self.filename = filename
446 self.storage_classes = storage_classes
447 self._api_client = api_client
448 self._state_lock = threading.Lock()
449 self._state = None # Previous run state (file list & manifest)
450 self._current_files = [] # Current run file list
451 self._cache_file = None
452 self._collection_lock = threading.Lock()
453 self._remote_collection = None # Collection being updated (if asked)
454 self._local_collection = None # Collection from previous run manifest
455 self._file_paths = set() # Files to be updated in remote collection
456 self._stop_checkpointer = threading.Event()
457 self._checkpointer = threading.Thread(target=self._update_task)
458 self._checkpointer.daemon = True
459 self._update_task_time = update_time # How many seconds wait between update runs
460 self._files_to_upload = FileUploadList(dry_run=dry_run)
461 self._upload_started = False
463 self.dry_run = dry_run
464 self._checkpoint_before_quit = True
465 self.follow_links = follow_links
466 self.exclude_paths = exclude_paths
467 self.exclude_names = exclude_names
469 if not self.use_cache and self.resume:
470 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
472 # Check for obvious dry-run responses
473 if self.dry_run and (not self.use_cache or not self.resume):
474 raise ArvPutUploadIsPending()
476 # Load cached data if any and if needed
477 self._setup_state(update_collection)
479 # Build the upload file list, excluding requested files and counting the
480 # bytes expected to be uploaded.
481 self._build_upload_list()
483 def _build_upload_list(self):
485 Scan the requested paths to count file sizes, excluding files & dirs if requested
486 and building the upload file list.
488 # If there aren't special files to be read, reset total bytes count to zero
490 if not any([p for p in self.paths
491 if not (os.path.isfile(p) or os.path.isdir(p))]):
492 self.bytes_expected = 0
494 for path in self.paths:
495 # Test for stdin first, in case some file named '-' exist
498 raise ArvPutUploadIsPending()
499 self._write_stdin(self.filename or 'stdin')
500 elif not os.path.exists(path):
501 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
502 elif os.path.isdir(path):
503 # Use absolute paths on cache index so CWD doesn't interfere
504 # with the caching logic.
506 path = os.path.abspath(path)
507 if orig_path[-1:] == os.sep:
508 # When passing a directory reference with a trailing slash,
509 # its contents should be uploaded directly to the
513 # When passing a directory reference with no trailing slash,
514 # upload the directory to the collection's root.
515 prefixdir = os.path.dirname(path)
517 for root, dirs, files in os.walk(path,
518 followlinks=self.follow_links):
519 root_relpath = os.path.relpath(root, path)
520 if root_relpath == '.':
522 # Exclude files/dirs by full path matching pattern
523 if self.exclude_paths:
524 dirs[:] = [d for d in dirs
525 if not any(pathname_match(
526 os.path.join(root_relpath, d), pat)
527 for pat in self.exclude_paths)]
528 files = [f for f in files
529 if not any(pathname_match(
530 os.path.join(root_relpath, f), pat)
531 for pat in self.exclude_paths)]
532 # Exclude files/dirs by name matching pattern
533 if self.exclude_names is not None:
534 dirs[:] = [d for d in dirs
535 if not self.exclude_names.match(d)]
536 files = [f for f in files
537 if not self.exclude_names.match(f)]
538 # Make os.walk()'s dir traversing order deterministic
542 filepath = os.path.join(root, f)
543 # Add its size to the total bytes count (if applicable)
544 if self.follow_links or (not os.path.islink(filepath)):
545 if self.bytes_expected is not None:
546 self.bytes_expected += os.path.getsize(filepath)
547 self._check_file(filepath,
548 os.path.join(root[len(prefixdir):], f))
550 filepath = os.path.abspath(path)
551 # Add its size to the total bytes count (if applicable)
552 if self.follow_links or (not os.path.islink(filepath)):
553 if self.bytes_expected is not None:
554 self.bytes_expected += os.path.getsize(filepath)
555 self._check_file(filepath,
556 self.filename or os.path.basename(path))
557 # If dry-mode is on, and got up to this point, then we should notify that
558 # there aren't any file to upload.
560 raise ArvPutUploadNotPending()
561 # Remove local_collection's files that don't exist locally anymore, so the
562 # bytes_written count is correct.
563 for f in self.collection_file_paths(self._local_collection,
565 if f != 'stdin' and f != self.filename and not f in self._file_paths:
566 self._local_collection.remove(f)
568 def start(self, save_collection):
570 Start supporting thread & file uploading
572 self._checkpointer.start()
574 # Update bytes_written from current local collection and
575 # report initial progress.
578 self._upload_started = True # Used by the update thread to start checkpointing
580 except (SystemExit, Exception) as e:
581 self._checkpoint_before_quit = False
582 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
583 # Note: We're expecting SystemExit instead of
584 # KeyboardInterrupt because we have a custom signal
585 # handler in place that raises SystemExit with the catched
587 if isinstance(e, PathDoesNotExistError):
588 # We aren't interested in the traceback for this case
590 elif not isinstance(e, SystemExit) or e.code != -2:
591 self.logger.warning("Abnormal termination:\n{}".format(
592 traceback.format_exc()))
596 # Stop the thread before doing anything else
597 self._stop_checkpointer.set()
598 self._checkpointer.join()
599 if self._checkpoint_before_quit:
600 # Commit all pending blocks & one last _update()
601 self._local_collection.manifest_text()
602 self._update(final=True)
604 self.save_collection()
606 self._cache_file.close()
608 def save_collection(self):
610 # Check if files should be updated on the remote collection.
611 for fp in self._file_paths:
612 remote_file = self._remote_collection.find(fp)
614 # File don't exist on remote collection, copy it.
615 self._remote_collection.copy(fp, fp, self._local_collection)
616 elif remote_file != self._local_collection.find(fp):
617 # A different file exist on remote collection, overwrite it.
618 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
620 # The file already exist on remote collection, skip it.
622 self._remote_collection.save(storage_classes=self.storage_classes,
623 num_retries=self.num_retries)
625 if self.storage_classes is None:
626 self.storage_classes = ['default']
627 self._local_collection.save_new(
628 name=self.name, owner_uuid=self.owner_uuid,
629 storage_classes=self.storage_classes,
630 ensure_unique_name=self.ensure_unique_name,
631 num_retries=self.num_retries)
633 def destroy_cache(self):
636 os.unlink(self._cache_filename)
637 except OSError as error:
638 # That's what we wanted anyway.
639 if error.errno != errno.ENOENT:
641 self._cache_file.close()
643 def _collection_size(self, collection):
645 Recursively get the total size of the collection
648 for item in listvalues(collection):
649 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
650 size += self._collection_size(item)
655 def _update_task(self):
657 Periodically called support task. File uploading is
658 asynchronous so we poll status from the collection.
660 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
663 def _update(self, final=False):
665 Update cached manifest text and report progress.
667 if self._upload_started:
668 with self._collection_lock:
669 self.bytes_written = self._collection_size(self._local_collection)
672 manifest = self._local_collection.manifest_text()
674 # Get the manifest text without comitting pending blocks
675 manifest = self._local_collection.manifest_text(strip=False,
679 with self._state_lock:
680 self._state['manifest'] = manifest
684 except Exception as e:
685 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
687 self.bytes_written = self.bytes_skipped
688 # Call the reporter, if any
689 self.report_progress()
691 def report_progress(self):
692 if self.reporter is not None:
693 self.reporter(self.bytes_written, self.bytes_expected)
695 def _write_stdin(self, filename):
696 output = self._local_collection.open(filename, 'wb')
697 self._write(sys.stdin, output)
700 def _check_file(self, source, filename):
702 Check if this file needs to be uploaded
704 # Ignore symlinks when requested
705 if (not self.follow_links) and os.path.islink(source):
708 should_upload = False
709 new_file_in_cache = False
710 # Record file path for updating the remote collection before exiting
711 self._file_paths.add(filename)
713 with self._state_lock:
714 # If no previous cached data on this file, store it for an eventual
716 if source not in self._state['files']:
717 self._state['files'][source] = {
718 'mtime': os.path.getmtime(source),
719 'size' : os.path.getsize(source)
721 new_file_in_cache = True
722 cached_file_data = self._state['files'][source]
724 # Check if file was already uploaded (at least partially)
725 file_in_local_collection = self._local_collection.find(filename)
727 # If not resuming, upload the full file.
730 # New file detected from last run, upload it.
731 elif new_file_in_cache:
733 # Local file didn't change from last run.
734 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
735 if not file_in_local_collection:
736 # File not uploaded yet, upload it completely
738 elif file_in_local_collection.permission_expired():
739 # Permission token expired, re-upload file. This will change whenever
740 # we have a API for refreshing tokens.
741 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
743 self._local_collection.remove(filename)
744 elif cached_file_data['size'] == file_in_local_collection.size():
745 # File already there, skip it.
746 self.bytes_skipped += cached_file_data['size']
747 elif cached_file_data['size'] > file_in_local_collection.size():
748 # File partially uploaded, resume!
749 resume_offset = file_in_local_collection.size()
750 self.bytes_skipped += resume_offset
753 # Inconsistent cache, re-upload the file
755 self._local_collection.remove(filename)
756 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
757 # Local file differs from cached data, re-upload it.
759 if file_in_local_collection:
760 self._local_collection.remove(filename)
765 self._files_to_upload.append((source, resume_offset, filename))
766 except ArvPutUploadIsPending:
767 # This could happen when running on dry-mode, close cache file to
768 # avoid locking issues.
769 self._cache_file.close()
772 def _upload_files(self):
773 for source, resume_offset, filename in self._files_to_upload:
774 with open(source, 'rb') as source_fd:
775 with self._state_lock:
776 self._state['files'][source]['mtime'] = os.path.getmtime(source)
777 self._state['files'][source]['size'] = os.path.getsize(source)
778 if resume_offset > 0:
779 # Start upload where we left off
780 output = self._local_collection.open(filename, 'ab')
781 source_fd.seek(resume_offset)
784 output = self._local_collection.open(filename, 'wb')
785 self._write(source_fd, output)
786 output.close(flush=False)
788 def _write(self, source_fd, output):
790 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
795 def _my_collection(self):
796 return self._remote_collection if self.update else self._local_collection
798 def _setup_state(self, update_collection):
800 Create a new cache file or load a previously existing one.
802 # Load an already existing collection for update
803 if update_collection and re.match(arvados.util.collection_uuid_pattern,
806 self._remote_collection = arvados.collection.Collection(
807 update_collection, api_client=self._api_client)
808 except arvados.errors.ApiError as error:
809 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
812 elif update_collection:
813 # Collection locator provided, but unknown format
814 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
817 # Set up cache file name from input paths.
819 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
820 realpaths = sorted(os.path.realpath(path) for path in self.paths)
821 md5.update(b'\0'.join([p.encode() for p in realpaths]))
823 md5.update(self.filename.encode())
824 cache_filename = md5.hexdigest()
825 cache_filepath = os.path.join(
826 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
828 if self.resume and os.path.exists(cache_filepath):
829 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
830 self._cache_file = open(cache_filepath, 'a+')
832 # --no-resume means start with a empty cache file.
833 self.logger.info("Creating new cache file at {}".format(cache_filepath))
834 self._cache_file = open(cache_filepath, 'w+')
835 self._cache_filename = self._cache_file.name
836 self._lock_file(self._cache_file)
837 self._cache_file.seek(0)
839 with self._state_lock:
842 self._state = json.load(self._cache_file)
843 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
844 # Cache at least partially incomplete, set up new cache
845 self._state = copy.deepcopy(self.EMPTY_STATE)
847 # Cache file empty, set up new cache
848 self._state = copy.deepcopy(self.EMPTY_STATE)
850 self.logger.info("No cache usage requested for this run.")
851 # No cache file, set empty state
852 self._state = copy.deepcopy(self.EMPTY_STATE)
853 # Load the previous manifest so we can check if files were modified remotely.
854 self._local_collection = arvados.collection.Collection(
855 self._state['manifest'],
856 replication_desired=self.replication_desired,
857 put_threads=self.put_threads,
858 api_client=self._api_client)
860 def collection_file_paths(self, col, path_prefix='.'):
861 """Return a list of file paths by recursively go through the entire collection `col`"""
863 for name, item in listitems(col):
864 if isinstance(item, arvados.arvfile.ArvadosFile):
865 file_paths.append(os.path.join(path_prefix, name))
866 elif isinstance(item, arvados.collection.Subcollection):
867 new_prefix = os.path.join(path_prefix, name)
868 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
871 def _lock_file(self, fileobj):
873 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
875 raise ResumeCacheConflict("{} locked".format(fileobj.name))
877 def _save_state(self):
879 Atomically save current state into cache.
881 with self._state_lock:
882 # We're not using copy.deepcopy() here because it's a lot slower
883 # than json.dumps(), and we're already needing JSON format to be
885 state = json.dumps(self._state)
887 new_cache = tempfile.NamedTemporaryFile(
889 dir=os.path.dirname(self._cache_filename), delete=False)
890 self._lock_file(new_cache)
891 new_cache.write(state)
894 os.rename(new_cache.name, self._cache_filename)
895 except (IOError, OSError, ResumeCacheConflict) as error:
896 self.logger.error("There was a problem while saving the cache file: {}".format(error))
898 os.unlink(new_cache_name)
899 except NameError: # mkstemp failed.
902 self._cache_file.close()
903 self._cache_file = new_cache
905 def collection_name(self):
906 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
908 def manifest_locator(self):
909 return self._my_collection().manifest_locator()
911 def portable_data_hash(self):
912 pdh = self._my_collection().portable_data_hash()
913 m = self._my_collection().stripped_manifest().encode()
914 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
916 self.logger.warning("\n".join([
917 "arv-put: API server provided PDH differs from local manifest.",
918 " This should not happen; showing API server version."]))
921 def manifest_text(self, stream_name=".", strip=False, normalize=False):
922 return self._my_collection().manifest_text(stream_name, strip, normalize)
924 def _datablocks_on_item(self, item):
926 Return a list of datablock locators, recursively navigating
927 through subcollections
929 if isinstance(item, arvados.arvfile.ArvadosFile):
932 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
935 for segment in item.segments():
936 loc = segment.locator
939 elif isinstance(item, arvados.collection.Collection):
940 l = [self._datablocks_on_item(x) for x in listvalues(item)]
941 # Fast list flattener method taken from:
942 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
943 return [loc for sublist in l for loc in sublist]
947 def data_locators(self):
948 with self._collection_lock:
949 # Make sure all datablocks are flushed before getting the locators
950 self._my_collection().manifest_text()
951 datablocks = self._datablocks_on_item(self._my_collection())
954 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
957 # Simulate glob.glob() matching behavior without the need to scan the filesystem
958 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
959 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
960 # so instead we're using it on every path component.
961 def pathname_match(pathname, pattern):
962 name = pathname.split(os.sep)
963 # Fix patterns like 'some/subdir/' or 'some//subdir'
964 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
965 if len(name) != len(pat):
967 for i in range(len(name)):
968 if not fnmatch.fnmatch(name[i], pat[i]):
972 def machine_progress(bytes_written, bytes_expected):
973 return _machine_format.format(
974 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
976 def human_progress(bytes_written, bytes_expected):
978 return "\r{}M / {}M {:.1%} ".format(
979 bytes_written >> 20, bytes_expected >> 20,
980 float(bytes_written) / bytes_expected)
982 return "\r{} ".format(bytes_written)
984 def progress_writer(progress_func, outfile=sys.stderr):
985 def write_progress(bytes_written, bytes_expected):
986 outfile.write(progress_func(bytes_written, bytes_expected))
987 return write_progress
989 def desired_project_uuid(api_client, project_uuid, num_retries):
991 query = api_client.users().current()
992 elif arvados.util.user_uuid_pattern.match(project_uuid):
993 query = api_client.users().get(uuid=project_uuid)
994 elif arvados.util.group_uuid_pattern.match(project_uuid):
995 query = api_client.groups().get(uuid=project_uuid)
997 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
998 return query.execute(num_retries=num_retries)['uuid']
1000 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1001 install_sig_handlers=True):
1004 args = parse_arguments(arguments)
1005 logger = logging.getLogger('arvados.arv_put')
1007 logger.setLevel(logging.WARNING)
1009 logger.setLevel(logging.INFO)
1012 request_id = arvados.util.new_request_id()
1014 formatter = ArvPutLogFormatter(request_id)
1015 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1017 if api_client is None:
1018 api_client = arvados.api('v1', request_id=request_id)
1020 if install_sig_handlers:
1021 arv_cmd.install_signal_handlers()
1023 # Determine the name to use
1025 if args.stream or args.raw:
1026 logger.error("Cannot use --name with --stream or --raw")
1028 elif args.update_collection:
1029 logger.error("Cannot use --name with --update-collection")
1031 collection_name = args.name
1033 collection_name = "Saved at {} by {}@{}".format(
1034 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1035 pwd.getpwuid(os.getuid()).pw_name,
1036 socket.gethostname())
1038 if args.project_uuid and (args.stream or args.raw):
1039 logger.error("Cannot use --project-uuid with --stream or --raw")
1042 # Determine the parent project
1044 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1046 except (apiclient_errors.Error, ValueError) as error:
1051 reporter = progress_writer(human_progress)
1052 elif args.batch_progress:
1053 reporter = progress_writer(machine_progress)
1057 # Split storage-classes argument
1058 storage_classes = None
1059 if args.storage_classes:
1060 storage_classes = args.storage_classes.strip().split(',')
1061 if len(storage_classes) > 1:
1062 logger.error("Multiple storage classes are not supported currently.")
1066 # Setup exclude regex from all the --exclude arguments provided
1069 exclude_names = None
1070 if len(args.exclude) > 0:
1071 # We're supporting 2 kinds of exclusion patterns:
1072 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1073 # the name, wherever the file is on the tree)
1074 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1075 # entire path, and should be relative to
1076 # any input dir argument)
1077 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1078 # placed directly underneath the input dir)
1079 for p in args.exclude:
1080 # Only relative paths patterns allowed
1081 if p.startswith(os.sep):
1082 logger.error("Cannot use absolute paths with --exclude")
1084 if os.path.dirname(p):
1085 # We don't support of path patterns with '..'
1086 p_parts = p.split(os.sep)
1089 "Cannot use path patterns that include or '..'")
1091 # Path search pattern
1092 exclude_paths.append(p)
1094 # Name-only search pattern
1095 name_patterns.append(p)
1096 # For name only matching, we can combine all patterns into a single
1097 # regexp, for better performance.
1098 exclude_names = re.compile('|'.join(
1099 [fnmatch.translate(p) for p in name_patterns]
1100 )) if len(name_patterns) > 0 else None
1101 # Show the user the patterns to be used, just in case they weren't
1102 # specified inside quotes and got changed by the shell expansion.
1103 logger.info("Exclude patterns: {}".format(args.exclude))
1105 # If this is used by a human, and there's at least one directory to be
1106 # uploaded, the expected bytes calculation can take a moment.
1107 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1108 logger.info("Calculating upload size, this could take some time...")
1110 writer = ArvPutUploadJob(paths = args.paths,
1111 resume = args.resume,
1112 use_cache = args.use_cache,
1113 filename = args.filename,
1114 reporter = reporter,
1115 api_client = api_client,
1116 num_retries = args.retries,
1117 replication_desired = args.replication,
1118 put_threads = args.threads,
1119 name = collection_name,
1120 owner_uuid = project_uuid,
1121 ensure_unique_name = True,
1122 update_collection = args.update_collection,
1123 storage_classes=storage_classes,
1125 dry_run=args.dry_run,
1126 follow_links=args.follow_links,
1127 exclude_paths=exclude_paths,
1128 exclude_names=exclude_names)
1129 except ResumeCacheConflict:
1130 logger.error("\n".join([
1131 "arv-put: Another process is already uploading this data.",
1132 " Use --no-cache if this is really what you want."]))
1134 except CollectionUpdateError as error:
1135 logger.error("\n".join([
1136 "arv-put: %s" % str(error)]))
1138 except ArvPutUploadIsPending:
1139 # Dry run check successful, return proper exit code.
1141 except ArvPutUploadNotPending:
1142 # No files pending for upload
1144 except PathDoesNotExistError as error:
1145 logger.error("\n".join([
1146 "arv-put: %s" % str(error)]))
1149 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1150 logger.warning("\n".join([
1151 "arv-put: Resuming previous upload from last checkpoint.",
1152 " Use the --no-resume option to start over."]))
1154 if not args.dry_run:
1155 writer.report_progress()
1158 writer.start(save_collection=not(args.stream or args.raw))
1159 except arvados.errors.ApiError as error:
1160 logger.error("\n".join([
1161 "arv-put: %s" % str(error)]))
1164 if args.progress: # Print newline to split stderr from stdout for humans.
1169 output = writer.manifest_text(normalize=True)
1171 output = writer.manifest_text()
1173 output = ','.join(writer.data_locators())
1176 if args.update_collection:
1177 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1179 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1180 if args.portable_data_hash:
1181 output = writer.portable_data_hash()
1183 output = writer.manifest_locator()
1184 except apiclient_errors.Error as error:
1186 "arv-put: Error creating Collection on project: {}.".format(
1190 # Print the locator (uuid) of the new collection.
1192 status = status or 1
1193 elif not args.silent:
1194 stdout.write(output)
1195 if not output.endswith('\n'):
1198 if install_sig_handlers:
1199 arv_cmd.restore_signal_handlers()
1208 if __name__ == '__main__':