1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
48 _group = upload_opts.add_mutually_exclusive_group()
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51 default=-1, help=argparse.SUPPRESS)
53 _group.add_argument('--normalize', action='store_true',
55 Normalize the manifest by re-ordering files and streams after writing
59 _group.add_argument('--dry-run', action='store_true', default=False,
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
65 _group = upload_opts.add_mutually_exclusive_group()
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
72 _group.add_argument('--stream', action='store_true',
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
81 Synonym for --manifest.
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
86 Synonym for --manifest.
89 _group.add_argument('--manifest', action='store_true',
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
101 _group.add_argument('--raw', action='store_true',
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109 dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115 dest='filename', help="""
116 Synonym for --filename.
119 upload_opts.add_argument('--filename', type=str, default=None,
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
145 On high latency installations, using a greater number will improve
149 run_opts = argparse.ArgumentParser(add_help=False)
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161 action='append', help="""
162 Exclude files and directories whose names match the given glob pattern. When
163 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
164 directory, relative to the provided input dirs will be excluded.
165 When using a filename pattern like '*.txt', any text file will be excluded
166 no matter where is placed.
167 You can specify multiple patterns by using this argument more than once.
170 _group = run_opts.add_mutually_exclusive_group()
171 _group.add_argument('--progress', action='store_true',
173 Display human-readable progress on stderr (bytes and, if possible,
174 percentage of total data size). This is the default behavior when
178 _group.add_argument('--no-progress', action='store_true',
180 Do not display human-readable progress on stderr, even if stderr is a
184 _group.add_argument('--batch-progress', action='store_true',
186 Display machine-readable progress on stderr (bytes and, if known,
190 _group = run_opts.add_mutually_exclusive_group()
191 _group.add_argument('--resume', action='store_true', default=True,
193 Continue interrupted uploads from cached state (default).
195 _group.add_argument('--no-resume', action='store_false', dest='resume',
197 Do not continue interrupted uploads from cached state.
200 _group = run_opts.add_mutually_exclusive_group()
201 _group.add_argument('--follow-links', action='store_true', default=True,
202 dest='follow_links', help="""
203 Follow file and directory symlinks (default).
205 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
207 Do not follow file and directory symlinks.
210 _group = run_opts.add_mutually_exclusive_group()
211 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
213 Save upload state in a cache file for resuming (default).
215 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
217 Do not save upload state in a cache file for resuming.
220 arg_parser = argparse.ArgumentParser(
221 description='Copy data from the local filesystem to Keep.',
222 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
224 def parse_arguments(arguments):
225 args = arg_parser.parse_args(arguments)
227 if len(args.paths) == 0:
230 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
232 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
235 --filename argument cannot be used when storing a directory or
239 # Turn on --progress by default if stderr is a tty.
240 if (not (args.batch_progress or args.no_progress)
241 and os.isatty(sys.stderr.fileno())):
244 # Turn off --resume (default) if --no-cache is used.
245 if not args.use_cache:
248 if args.paths == ['-']:
249 if args.update_collection:
251 --update-collection cannot be used when reading from stdin.
254 args.use_cache = False
255 if not args.filename:
256 args.filename = 'stdin'
258 # Remove possible duplicated patterns
259 if len(args.exclude) > 0:
260 args.exclude = list(set(args.exclude))
265 class PathDoesNotExistError(Exception):
269 class CollectionUpdateError(Exception):
273 class ResumeCacheConflict(Exception):
277 class ArvPutArgumentConflict(Exception):
281 class ArvPutUploadIsPending(Exception):
285 class ArvPutUploadNotPending(Exception):
289 class FileUploadList(list):
290 def __init__(self, dry_run=False):
292 self.dry_run = dry_run
294 def append(self, other):
296 raise ArvPutUploadIsPending()
297 super(FileUploadList, self).append(other)
300 class ResumeCache(object):
301 CACHE_DIR = '.cache/arvados/arv-put'
303 def __init__(self, file_spec):
304 self.cache_file = open(file_spec, 'a+')
305 self._lock_file(self.cache_file)
306 self.filename = self.cache_file.name
309 def make_path(cls, args):
311 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
312 realpaths = sorted(os.path.realpath(path) for path in args.paths)
313 md5.update(b'\0'.join([p.encode() for p in realpaths]))
314 if any(os.path.isdir(path) for path in realpaths):
317 md5.update(args.filename.encode())
319 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
322 def _lock_file(self, fileobj):
324 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
326 raise ResumeCacheConflict("{} locked".format(fileobj.name))
329 self.cache_file.seek(0)
330 return json.load(self.cache_file)
332 def check_cache(self, api_client=None, num_retries=0):
337 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
338 locator = state["_finished_streams"][0][1][0]
339 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
340 locator = state["_current_stream_locators"][0]
341 if locator is not None:
342 kc = arvados.keep.KeepClient(api_client=api_client)
343 kc.head(locator, num_retries=num_retries)
344 except Exception as e:
349 def save(self, data):
351 new_cache_fd, new_cache_name = tempfile.mkstemp(
352 dir=os.path.dirname(self.filename))
353 self._lock_file(new_cache_fd)
354 new_cache = os.fdopen(new_cache_fd, 'r+')
355 json.dump(data, new_cache)
356 os.rename(new_cache_name, self.filename)
357 except (IOError, OSError, ResumeCacheConflict) as error:
359 os.unlink(new_cache_name)
360 except NameError: # mkstemp failed.
363 self.cache_file.close()
364 self.cache_file = new_cache
367 self.cache_file.close()
371 os.unlink(self.filename)
372 except OSError as error:
373 if error.errno != errno.ENOENT: # That's what we wanted anyway.
379 self.__init__(self.filename)
382 class ArvPutUploadJob(object):
383 CACHE_DIR = '.cache/arvados/arv-put'
385 'manifest' : None, # Last saved manifest checkpoint
386 'files' : {} # Previous run file list: {path : {size, mtime}}
389 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
390 name=None, owner_uuid=None,
391 ensure_unique_name=False, num_retries=None,
392 put_threads=None, replication_desired=None,
393 filename=None, update_time=60.0, update_collection=None,
394 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
395 follow_links=True, exclude_paths=[], exclude_names=None):
398 self.use_cache = use_cache
400 self.reporter = reporter
401 # This will set to 0 before start counting, if no special files are going
403 self.bytes_expected = None
404 self.bytes_written = 0
405 self.bytes_skipped = 0
407 self.owner_uuid = owner_uuid
408 self.ensure_unique_name = ensure_unique_name
409 self.num_retries = num_retries
410 self.replication_desired = replication_desired
411 self.put_threads = put_threads
412 self.filename = filename
413 self._state_lock = threading.Lock()
414 self._state = None # Previous run state (file list & manifest)
415 self._current_files = [] # Current run file list
416 self._cache_file = None
417 self._collection_lock = threading.Lock()
418 self._remote_collection = None # Collection being updated (if asked)
419 self._local_collection = None # Collection from previous run manifest
420 self._file_paths = set() # Files to be updated in remote collection
421 self._stop_checkpointer = threading.Event()
422 self._checkpointer = threading.Thread(target=self._update_task)
423 self._checkpointer.daemon = True
424 self._update_task_time = update_time # How many seconds wait between update runs
425 self._files_to_upload = FileUploadList(dry_run=dry_run)
426 self._upload_started = False
428 self.dry_run = dry_run
429 self._checkpoint_before_quit = True
430 self.follow_links = follow_links
431 self.exclude_paths = exclude_paths
432 self.exclude_names = exclude_names
434 if not self.use_cache and self.resume:
435 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
437 # Check for obvious dry-run responses
438 if self.dry_run and (not self.use_cache or not self.resume):
439 raise ArvPutUploadIsPending()
441 # Load cached data if any and if needed
442 self._setup_state(update_collection)
444 # Build the upload file list, excluding requested files and counting the
445 # bytes expected to be uploaded.
446 self._build_upload_list()
448 def _build_upload_list(self):
450 Scan the requested paths to count file sizes, excluding files & dirs if requested
451 and building the upload file list.
453 # If there aren't special files to be read, reset total bytes count to zero
455 if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
457 self.bytes_expected = 0
459 for path in self.paths:
460 # Test for stdin first, in case some file named '-' exist
463 raise ArvPutUploadIsPending()
464 self._write_stdin(self.filename or 'stdin')
465 elif not os.path.exists(path):
466 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
467 elif os.path.isdir(path):
468 # Use absolute paths on cache index so CWD doesn't interfere
469 # with the caching logic.
471 path = os.path.abspath(path)
472 if orig_path[-1:] == os.sep:
473 # When passing a directory reference with a trailing slash,
474 # its contents should be uploaded directly to the
478 # When passing a directory reference with no trailing slash,
479 # upload the directory to the collection's root.
480 prefixdir = os.path.dirname(path)
482 for root, dirs, files in os.walk(path,
483 followlinks=self.follow_links):
484 root_relpath = os.path.relpath(root, path)
485 if root_relpath == '.':
487 # Exclude files/dirs by full path matching pattern
488 if self.exclude_paths:
491 [pathname_match(os.path.join(root_relpath, d),
493 for pat in self.exclude_paths]),
497 [pathname_match(os.path.join(root_relpath, f),
499 for pat in self.exclude_paths]),
501 # Exclude files/dirs by name matching pattern
502 if self.exclude_names is not None:
503 dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
504 files = filter(lambda f: not self.exclude_names.match(f), files)
505 # Make os.walk()'s dir traversing order deterministic
509 filepath = os.path.join(root, f)
510 # Add its size to the total bytes count (if applicable)
511 if self.follow_links or (not os.path.islink(filepath)):
512 if self.bytes_expected is not None:
513 self.bytes_expected += os.path.getsize(filepath)
514 self._check_file(filepath,
515 os.path.join(root[len(prefixdir):], f))
517 filepath = os.path.abspath(path)
518 # Add its size to the total bytes count (if applicable)
519 if self.follow_links or (not os.path.islink(filepath)):
520 if self.bytes_expected is not None:
521 self.bytes_expected += os.path.getsize(filepath)
522 self._check_file(filepath,
523 self.filename or os.path.basename(path))
524 # If dry-mode is on, and got up to this point, then we should notify that
525 # there aren't any file to upload.
527 raise ArvPutUploadNotPending()
528 # Remove local_collection's files that don't exist locally anymore, so the
529 # bytes_written count is correct.
530 for f in self.collection_file_paths(self._local_collection,
532 if f != 'stdin' and f != self.filename and not f in self._file_paths:
533 self._local_collection.remove(f)
535 def start(self, save_collection):
537 Start supporting thread & file uploading
539 self._checkpointer.start()
541 # Update bytes_written from current local collection and
542 # report initial progress.
545 self._upload_started = True # Used by the update thread to start checkpointing
547 except (SystemExit, Exception) as e:
548 self._checkpoint_before_quit = False
549 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
550 # Note: We're expecting SystemExit instead of
551 # KeyboardInterrupt because we have a custom signal
552 # handler in place that raises SystemExit with the catched
554 if isinstance(e, PathDoesNotExistError):
555 # We aren't interested in the traceback for this case
557 elif not isinstance(e, SystemExit) or e.code != -2:
558 self.logger.warning("Abnormal termination:\n{}".format(
559 traceback.format_exc()))
563 # Stop the thread before doing anything else
564 self._stop_checkpointer.set()
565 self._checkpointer.join()
566 if self._checkpoint_before_quit:
567 # Commit all pending blocks & one last _update()
568 self._local_collection.manifest_text()
569 self._update(final=True)
571 self.save_collection()
573 self._cache_file.close()
575 def save_collection(self):
577 # Check if files should be updated on the remote collection.
578 for fp in self._file_paths:
579 remote_file = self._remote_collection.find(fp)
581 # File don't exist on remote collection, copy it.
582 self._remote_collection.copy(fp, fp, self._local_collection)
583 elif remote_file != self._local_collection.find(fp):
584 # A different file exist on remote collection, overwrite it.
585 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
587 # The file already exist on remote collection, skip it.
589 self._remote_collection.save(num_retries=self.num_retries)
591 self._local_collection.save_new(
592 name=self.name, owner_uuid=self.owner_uuid,
593 ensure_unique_name=self.ensure_unique_name,
594 num_retries=self.num_retries)
596 def destroy_cache(self):
599 os.unlink(self._cache_filename)
600 except OSError as error:
601 # That's what we wanted anyway.
602 if error.errno != errno.ENOENT:
604 self._cache_file.close()
606 def _collection_size(self, collection):
608 Recursively get the total size of the collection
611 for item in listvalues(collection):
612 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
613 size += self._collection_size(item)
618 def _update_task(self):
620 Periodically called support task. File uploading is
621 asynchronous so we poll status from the collection.
623 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
626 def _update(self, final=False):
628 Update cached manifest text and report progress.
630 if self._upload_started:
631 with self._collection_lock:
632 self.bytes_written = self._collection_size(self._local_collection)
635 manifest = self._local_collection.manifest_text()
637 # Get the manifest text without comitting pending blocks
638 manifest = self._local_collection.manifest_text(strip=False,
642 with self._state_lock:
643 self._state['manifest'] = manifest
647 except Exception as e:
648 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
650 self.bytes_written = self.bytes_skipped
651 # Call the reporter, if any
652 self.report_progress()
654 def report_progress(self):
655 if self.reporter is not None:
656 self.reporter(self.bytes_written, self.bytes_expected)
658 def _write_stdin(self, filename):
659 output = self._local_collection.open(filename, 'wb')
660 self._write(sys.stdin, output)
663 def _check_file(self, source, filename):
665 Check if this file needs to be uploaded
667 # Ignore symlinks when requested
668 if (not self.follow_links) and os.path.islink(source):
671 should_upload = False
672 new_file_in_cache = False
673 # Record file path for updating the remote collection before exiting
674 self._file_paths.add(filename)
676 with self._state_lock:
677 # If no previous cached data on this file, store it for an eventual
679 if source not in self._state['files']:
680 self._state['files'][source] = {
681 'mtime': os.path.getmtime(source),
682 'size' : os.path.getsize(source)
684 new_file_in_cache = True
685 cached_file_data = self._state['files'][source]
687 # Check if file was already uploaded (at least partially)
688 file_in_local_collection = self._local_collection.find(filename)
690 # If not resuming, upload the full file.
693 # New file detected from last run, upload it.
694 elif new_file_in_cache:
696 # Local file didn't change from last run.
697 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
698 if not file_in_local_collection:
699 # File not uploaded yet, upload it completely
701 elif file_in_local_collection.permission_expired():
702 # Permission token expired, re-upload file. This will change whenever
703 # we have a API for refreshing tokens.
705 self._local_collection.remove(filename)
706 elif cached_file_data['size'] == file_in_local_collection.size():
707 # File already there, skip it.
708 self.bytes_skipped += cached_file_data['size']
709 elif cached_file_data['size'] > file_in_local_collection.size():
710 # File partially uploaded, resume!
711 resume_offset = file_in_local_collection.size()
712 self.bytes_skipped += resume_offset
715 # Inconsistent cache, re-upload the file
717 self._local_collection.remove(filename)
718 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
719 # Local file differs from cached data, re-upload it.
721 if file_in_local_collection:
722 self._local_collection.remove(filename)
727 self._files_to_upload.append((source, resume_offset, filename))
728 except ArvPutUploadIsPending:
729 # This could happen when running on dry-mode, close cache file to
730 # avoid locking issues.
731 self._cache_file.close()
734 def _upload_files(self):
735 for source, resume_offset, filename in self._files_to_upload:
736 with open(source, 'rb') as source_fd:
737 with self._state_lock:
738 self._state['files'][source]['mtime'] = os.path.getmtime(source)
739 self._state['files'][source]['size'] = os.path.getsize(source)
740 if resume_offset > 0:
741 # Start upload where we left off
742 output = self._local_collection.open(filename, 'ab')
743 source_fd.seek(resume_offset)
746 output = self._local_collection.open(filename, 'wb')
747 self._write(source_fd, output)
748 output.close(flush=False)
750 def _write(self, source_fd, output):
752 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
757 def _my_collection(self):
758 return self._remote_collection if self.update else self._local_collection
760 def _setup_state(self, update_collection):
762 Create a new cache file or load a previously existing one.
764 # Load an already existing collection for update
765 if update_collection and re.match(arvados.util.collection_uuid_pattern,
768 self._remote_collection = arvados.collection.Collection(update_collection)
769 except arvados.errors.ApiError as error:
770 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
773 elif update_collection:
774 # Collection locator provided, but unknown format
775 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
778 # Set up cache file name from input paths.
780 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
781 realpaths = sorted(os.path.realpath(path) for path in self.paths)
782 md5.update(b'\0'.join([p.encode() for p in realpaths]))
784 md5.update(self.filename.encode())
785 cache_filename = md5.hexdigest()
786 cache_filepath = os.path.join(
787 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
789 if self.resume and os.path.exists(cache_filepath):
790 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
791 self._cache_file = open(cache_filepath, 'a+')
793 # --no-resume means start with a empty cache file.
794 self.logger.info("Creating new cache file at {}".format(cache_filepath))
795 self._cache_file = open(cache_filepath, 'w+')
796 self._cache_filename = self._cache_file.name
797 self._lock_file(self._cache_file)
798 self._cache_file.seek(0)
800 with self._state_lock:
803 self._state = json.load(self._cache_file)
804 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
805 # Cache at least partially incomplete, set up new cache
806 self._state = copy.deepcopy(self.EMPTY_STATE)
808 # Cache file empty, set up new cache
809 self._state = copy.deepcopy(self.EMPTY_STATE)
811 self.logger.info("No cache usage requested for this run.")
812 # No cache file, set empty state
813 self._state = copy.deepcopy(self.EMPTY_STATE)
814 # Load the previous manifest so we can check if files were modified remotely.
815 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
817 def collection_file_paths(self, col, path_prefix='.'):
818 """Return a list of file paths by recursively go through the entire collection `col`"""
820 for name, item in listitems(col):
821 if isinstance(item, arvados.arvfile.ArvadosFile):
822 file_paths.append(os.path.join(path_prefix, name))
823 elif isinstance(item, arvados.collection.Subcollection):
824 new_prefix = os.path.join(path_prefix, name)
825 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
828 def _lock_file(self, fileobj):
830 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
832 raise ResumeCacheConflict("{} locked".format(fileobj.name))
834 def _save_state(self):
836 Atomically save current state into cache.
838 with self._state_lock:
839 # We're not using copy.deepcopy() here because it's a lot slower
840 # than json.dumps(), and we're already needing JSON format to be
842 state = json.dumps(self._state)
844 new_cache = tempfile.NamedTemporaryFile(
846 dir=os.path.dirname(self._cache_filename), delete=False)
847 self._lock_file(new_cache)
848 new_cache.write(state)
851 os.rename(new_cache.name, self._cache_filename)
852 except (IOError, OSError, ResumeCacheConflict) as error:
853 self.logger.error("There was a problem while saving the cache file: {}".format(error))
855 os.unlink(new_cache_name)
856 except NameError: # mkstemp failed.
859 self._cache_file.close()
860 self._cache_file = new_cache
862 def collection_name(self):
863 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
865 def manifest_locator(self):
866 return self._my_collection().manifest_locator()
868 def portable_data_hash(self):
869 pdh = self._my_collection().portable_data_hash()
870 m = self._my_collection().stripped_manifest().encode()
871 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
873 logger.warning("\n".join([
874 "arv-put: API server provided PDH differs from local manifest.",
875 " This should not happen; showing API server version."]))
878 def manifest_text(self, stream_name=".", strip=False, normalize=False):
879 return self._my_collection().manifest_text(stream_name, strip, normalize)
881 def _datablocks_on_item(self, item):
883 Return a list of datablock locators, recursively navigating
884 through subcollections
886 if isinstance(item, arvados.arvfile.ArvadosFile):
889 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
892 for segment in item.segments():
893 loc = segment.locator
896 elif isinstance(item, arvados.collection.Collection):
897 l = [self._datablocks_on_item(x) for x in listvalues(item)]
898 # Fast list flattener method taken from:
899 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
900 return [loc for sublist in l for loc in sublist]
904 def data_locators(self):
905 with self._collection_lock:
906 # Make sure all datablocks are flushed before getting the locators
907 self._my_collection().manifest_text()
908 datablocks = self._datablocks_on_item(self._my_collection())
911 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
914 # Simulate glob.glob() matching behavior without the need to scan the filesystem
915 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
916 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
917 # so instead we're using it on every path component.
918 def pathname_match(pathname, pattern):
919 name = pathname.split(os.sep)
920 # Fix patterns like 'some/subdir/' or 'some//subdir'
921 pat = [x for x in pattern.split(os.sep) if x != '']
922 if len(name) != len(pat):
924 for i in range(len(name)):
925 if not fnmatch.fnmatch(name[i], pat[i]):
929 def machine_progress(bytes_written, bytes_expected):
930 return _machine_format.format(
931 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
933 def human_progress(bytes_written, bytes_expected):
935 return "\r{}M / {}M {:.1%} ".format(
936 bytes_written >> 20, bytes_expected >> 20,
937 float(bytes_written) / bytes_expected)
939 return "\r{} ".format(bytes_written)
941 def progress_writer(progress_func, outfile=sys.stderr):
942 def write_progress(bytes_written, bytes_expected):
943 outfile.write(progress_func(bytes_written, bytes_expected))
944 return write_progress
946 def exit_signal_handler(sigcode, frame):
949 def desired_project_uuid(api_client, project_uuid, num_retries):
951 query = api_client.users().current()
952 elif arvados.util.user_uuid_pattern.match(project_uuid):
953 query = api_client.users().get(uuid=project_uuid)
954 elif arvados.util.group_uuid_pattern.match(project_uuid):
955 query = api_client.groups().get(uuid=project_uuid)
957 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
958 return query.execute(num_retries=num_retries)['uuid']
960 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
963 logger = logging.getLogger('arvados.arv_put')
964 logger.setLevel(logging.INFO)
965 args = parse_arguments(arguments)
967 if api_client is None:
968 api_client = arvados.api('v1')
970 # Determine the name to use
972 if args.stream or args.raw:
973 logger.error("Cannot use --name with --stream or --raw")
975 elif args.update_collection:
976 logger.error("Cannot use --name with --update-collection")
978 collection_name = args.name
980 collection_name = "Saved at {} by {}@{}".format(
981 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
982 pwd.getpwuid(os.getuid()).pw_name,
983 socket.gethostname())
985 if args.project_uuid and (args.stream or args.raw):
986 logger.error("Cannot use --project-uuid with --stream or --raw")
989 # Determine the parent project
991 project_uuid = desired_project_uuid(api_client, args.project_uuid,
993 except (apiclient_errors.Error, ValueError) as error:
998 reporter = progress_writer(human_progress)
999 elif args.batch_progress:
1000 reporter = progress_writer(machine_progress)
1004 # Setup exclude regex from all the --exclude arguments provided
1007 exclude_names = None
1008 if len(args.exclude) > 0:
1009 # We're supporting 2 kinds of exclusion patterns:
1010 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1012 # 2) --exclude 'foo/bar' (file/dir path patterns, will match the
1013 # entire path, and should be relative to
1014 # any input dir argument)
1015 for p in args.exclude:
1016 # Only relative paths patterns allowed
1017 if p.startswith(os.sep):
1018 logger.error("Cannot use absolute paths with --exclude")
1020 if os.path.dirname(p):
1021 # We don't support of path patterns with '.' or '..'
1022 p_parts = p.split(os.sep)
1023 if '.' in p_parts or '..' in p_parts:
1025 "Cannot use path patterns that include '.' or '..'")
1027 # Path search pattern
1028 exclude_paths.append(p)
1030 # Name-only search pattern
1031 name_patterns.append(p)
1032 # For name only matching, we can combine all patterns into a single regexp,
1033 # for better performance.
1034 exclude_names = re.compile('|'.join(
1035 [fnmatch.translate(p) for p in name_patterns]
1036 )) if len(name_patterns) > 0 else None
1037 # Show the user the patterns to be used, just in case they weren't specified inside
1038 # quotes and got changed by the shell expansion.
1039 logger.info("Exclude patterns: {}".format(args.exclude))
1041 # If this is used by a human, and there's at least one directory to be
1042 # uploaded, the expected bytes calculation can take a moment.
1043 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1044 logger.info("Calculating upload size, this could take some time...")
1046 writer = ArvPutUploadJob(paths = args.paths,
1047 resume = args.resume,
1048 use_cache = args.use_cache,
1049 filename = args.filename,
1050 reporter = reporter,
1051 num_retries = args.retries,
1052 replication_desired = args.replication,
1053 put_threads = args.threads,
1054 name = collection_name,
1055 owner_uuid = project_uuid,
1056 ensure_unique_name = True,
1057 update_collection = args.update_collection,
1059 dry_run=args.dry_run,
1060 follow_links=args.follow_links,
1061 exclude_paths=exclude_paths,
1062 exclude_names=exclude_names)
1063 except ResumeCacheConflict:
1064 logger.error("\n".join([
1065 "arv-put: Another process is already uploading this data.",
1066 " Use --no-cache if this is really what you want."]))
1068 except CollectionUpdateError as error:
1069 logger.error("\n".join([
1070 "arv-put: %s" % str(error)]))
1072 except ArvPutUploadIsPending:
1073 # Dry run check successful, return proper exit code.
1075 except ArvPutUploadNotPending:
1076 # No files pending for upload
1078 except PathDoesNotExistError as error:
1079 logger.error("\n".join([
1080 "arv-put: %s" % str(error)]))
1083 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1085 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1086 for sigcode in CAUGHT_SIGNALS}
1088 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1089 logger.warning("\n".join([
1090 "arv-put: Resuming previous upload from last checkpoint.",
1091 " Use the --no-resume option to start over."]))
1093 if not args.dry_run:
1094 writer.report_progress()
1097 writer.start(save_collection=not(args.stream or args.raw))
1098 except arvados.errors.ApiError as error:
1099 logger.error("\n".join([
1100 "arv-put: %s" % str(error)]))
1103 if args.progress: # Print newline to split stderr from stdout for humans.
1108 output = writer.manifest_text(normalize=True)
1110 output = writer.manifest_text()
1112 output = ','.join(writer.data_locators())
1115 if args.update_collection:
1116 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1118 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1119 if args.portable_data_hash:
1120 output = writer.portable_data_hash()
1122 output = writer.manifest_locator()
1123 except apiclient_errors.Error as error:
1125 "arv-put: Error creating Collection on project: {}.".format(
1129 # Print the locator (uuid) of the new collection.
1131 status = status or 1
1133 stdout.write(output)
1134 if not output.endswith('\n'):
1137 for sigcode, orig_handler in listitems(orig_signal_handlers):
1138 signal.signal(sigcode, orig_handler)
1147 if __name__ == '__main__':