1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
7 import arvados.collection
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
31 import arvados.commands._util as arv_cmd
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
36 upload_opts = argparse.ArgumentParser(add_help=False)
38 upload_opts.add_argument('--version', action='version',
39 version="%s %s" % (sys.argv[0], __version__),
40 help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
48 _group = upload_opts.add_mutually_exclusive_group()
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51 default=-1, help=argparse.SUPPRESS)
53 _group.add_argument('--normalize', action='store_true',
55 Normalize the manifest by re-ordering files and streams after writing
59 _group.add_argument('--dry-run', action='store_true', default=False,
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
65 _group = upload_opts.add_mutually_exclusive_group()
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
72 _group.add_argument('--stream', action='store_true',
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
81 Synonym for --manifest.
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
86 Synonym for --manifest.
89 _group.add_argument('--manifest', action='store_true',
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
101 _group.add_argument('--raw', action='store_true',
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109 dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115 dest='filename', help="""
116 Synonym for --filename.
119 upload_opts.add_argument('--filename', type=str, default=None,
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
145 On high latency installations, using a greater number will improve
149 run_opts = argparse.ArgumentParser(add_help=False)
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161 action='append', help="""
162 Exclude files and directories whose names match the given pattern. You
163 can specify multiple patterns by using this argument more than once.
166 _group = run_opts.add_mutually_exclusive_group()
167 _group.add_argument('--progress', action='store_true',
169 Display human-readable progress on stderr (bytes and, if possible,
170 percentage of total data size). This is the default behavior when
174 _group.add_argument('--no-progress', action='store_true',
176 Do not display human-readable progress on stderr, even if stderr is a
180 _group.add_argument('--batch-progress', action='store_true',
182 Display machine-readable progress on stderr (bytes and, if known,
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--resume', action='store_true', default=True,
189 Continue interrupted uploads from cached state (default).
191 _group.add_argument('--no-resume', action='store_false', dest='resume',
193 Do not continue interrupted uploads from cached state.
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--follow-links', action='store_true', default=True,
198 dest='follow_links', help="""
199 Follow file and directory symlinks (default).
201 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
203 Do not follow file and directory symlinks.
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
209 Save upload state in a cache file for resuming (default).
211 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
213 Do not save upload state in a cache file for resuming.
216 arg_parser = argparse.ArgumentParser(
217 description='Copy data from the local filesystem to Keep.',
218 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
220 def parse_arguments(arguments):
221 args = arg_parser.parse_args(arguments)
223 if len(args.paths) == 0:
226 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
228 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
231 --filename argument cannot be used when storing a directory or
235 # Turn on --progress by default if stderr is a tty.
236 if (not (args.batch_progress or args.no_progress)
237 and os.isatty(sys.stderr.fileno())):
240 # Turn off --resume (default) if --no-cache is used.
241 if not args.use_cache:
244 if args.paths == ['-']:
245 if args.update_collection:
247 --update-collection cannot be used when reading from stdin.
250 args.use_cache = False
251 if not args.filename:
252 args.filename = 'stdin'
254 # Remove possible duplicated patterns
255 if len(args.exclude) > 0:
256 args.exclude = list(set(args.exclude))
261 class PathDoesNotExistError(Exception):
265 class CollectionUpdateError(Exception):
269 class ResumeCacheConflict(Exception):
273 class ArvPutArgumentConflict(Exception):
277 class ArvPutUploadIsPending(Exception):
281 class ArvPutUploadNotPending(Exception):
285 class FileUploadList(list):
286 def __init__(self, dry_run=False):
288 self.dry_run = dry_run
290 def append(self, other):
292 raise ArvPutUploadIsPending()
293 super(FileUploadList, self).append(other)
296 class ResumeCache(object):
297 CACHE_DIR = '.cache/arvados/arv-put'
299 def __init__(self, file_spec):
300 self.cache_file = open(file_spec, 'a+')
301 self._lock_file(self.cache_file)
302 self.filename = self.cache_file.name
305 def make_path(cls, args):
307 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
308 realpaths = sorted(os.path.realpath(path) for path in args.paths)
309 md5.update(b'\0'.join([p.encode() for p in realpaths]))
310 if any(os.path.isdir(path) for path in realpaths):
313 md5.update(args.filename.encode())
315 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
318 def _lock_file(self, fileobj):
320 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
322 raise ResumeCacheConflict("{} locked".format(fileobj.name))
325 self.cache_file.seek(0)
326 return json.load(self.cache_file)
328 def check_cache(self, api_client=None, num_retries=0):
333 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
334 locator = state["_finished_streams"][0][1][0]
335 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
336 locator = state["_current_stream_locators"][0]
337 if locator is not None:
338 kc = arvados.keep.KeepClient(api_client=api_client)
339 kc.head(locator, num_retries=num_retries)
340 except Exception as e:
345 def save(self, data):
347 new_cache_fd, new_cache_name = tempfile.mkstemp(
348 dir=os.path.dirname(self.filename))
349 self._lock_file(new_cache_fd)
350 new_cache = os.fdopen(new_cache_fd, 'r+')
351 json.dump(data, new_cache)
352 os.rename(new_cache_name, self.filename)
353 except (IOError, OSError, ResumeCacheConflict) as error:
355 os.unlink(new_cache_name)
356 except NameError: # mkstemp failed.
359 self.cache_file.close()
360 self.cache_file = new_cache
363 self.cache_file.close()
367 os.unlink(self.filename)
368 except OSError as error:
369 if error.errno != errno.ENOENT: # That's what we wanted anyway.
375 self.__init__(self.filename)
378 class ArvPutUploadJob(object):
379 CACHE_DIR = '.cache/arvados/arv-put'
381 'manifest' : None, # Last saved manifest checkpoint
382 'files' : {} # Previous run file list: {path : {size, mtime}}
385 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
386 bytes_expected=None, name=None, owner_uuid=None,
387 ensure_unique_name=False, num_retries=None,
388 put_threads=None, replication_desired=None,
389 filename=None, update_time=60.0, update_collection=None,
390 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
391 follow_links=True, exclude_paths=[], exclude_names=None):
394 self.use_cache = use_cache
396 self.reporter = reporter
397 self.bytes_expected = bytes_expected
398 self.bytes_written = 0
399 self.bytes_skipped = 0
401 self.owner_uuid = owner_uuid
402 self.ensure_unique_name = ensure_unique_name
403 self.num_retries = num_retries
404 self.replication_desired = replication_desired
405 self.put_threads = put_threads
406 self.filename = filename
407 self._state_lock = threading.Lock()
408 self._state = None # Previous run state (file list & manifest)
409 self._current_files = [] # Current run file list
410 self._cache_file = None
411 self._collection_lock = threading.Lock()
412 self._remote_collection = None # Collection being updated (if asked)
413 self._local_collection = None # Collection from previous run manifest
414 self._file_paths = set() # Files to be updated in remote collection
415 self._stop_checkpointer = threading.Event()
416 self._checkpointer = threading.Thread(target=self._update_task)
417 self._checkpointer.daemon = True
418 self._update_task_time = update_time # How many seconds wait between update runs
419 self._files_to_upload = FileUploadList(dry_run=dry_run)
420 self._upload_started = False
422 self.dry_run = dry_run
423 self._checkpoint_before_quit = True
424 self.follow_links = follow_links
425 self.exclude_paths = exclude_paths
426 self.exclude_names = exclude_names
428 if not self.use_cache and self.resume:
429 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
431 # Check for obvious dry-run responses
432 if self.dry_run and (not self.use_cache or not self.resume):
433 raise ArvPutUploadIsPending()
435 # Load cached data if any and if needed
436 self._setup_state(update_collection)
438 def start(self, save_collection):
440 Start supporting thread & file uploading
443 self._checkpointer.start()
445 for path in self.paths:
446 # Test for stdin first, in case some file named '-' exist
449 raise ArvPutUploadIsPending()
450 self._write_stdin(self.filename or 'stdin')
451 elif not os.path.exists(path):
452 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
453 elif os.path.isdir(path):
454 # Use absolute paths on cache index so CWD doesn't interfere
455 # with the caching logic.
457 path = os.path.abspath(path)
458 if orig_path[-1:] == os.sep:
459 # When passing a directory reference with a trailing slash,
460 # its contents should be uploaded directly to the collection's root.
463 # When passing a directory reference with no trailing slash,
464 # upload the directory to the collection's root.
465 prefixdir = os.path.dirname(path)
467 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
468 root_relpath = os.path.relpath(root, path)
469 # Exclude files/dirs by full path matching pattern
470 if self.exclude_paths:
472 lambda d: not any([pathname_match(os.path.join(root_relpath, d),
474 for pat in self.exclude_paths]),
477 lambda f: not any([pathname_match(os.path.join(root_relpath, f),
479 for pat in self.exclude_paths]),
481 # Exclude files/dirs by name matching pattern
482 if self.exclude_names is not None:
483 dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
484 files = filter(lambda f: not self.exclude_names.match(f), files)
485 # Make os.walk()'s dir traversing order deterministic
489 self._check_file(os.path.join(root, f),
490 os.path.join(root[len(prefixdir):], f))
492 self._check_file(os.path.abspath(path),
493 self.filename or os.path.basename(path))
494 # If dry-mode is on, and got up to this point, then we should notify that
495 # there aren't any file to upload.
497 raise ArvPutUploadNotPending()
498 # Remove local_collection's files that don't exist locally anymore, so the
499 # bytes_written count is correct.
500 for f in self.collection_file_paths(self._local_collection,
502 if f != 'stdin' and f != self.filename and not f in self._file_paths:
503 self._local_collection.remove(f)
504 # Update bytes_written from current local collection and
505 # report initial progress.
508 self._upload_started = True # Used by the update thread to start checkpointing
510 except (SystemExit, Exception) as e:
511 self._checkpoint_before_quit = False
512 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
513 # Note: We're expecting SystemExit instead of
514 # KeyboardInterrupt because we have a custom signal
515 # handler in place that raises SystemExit with the catched
517 if isinstance(e, PathDoesNotExistError):
518 # We aren't interested in the traceback for this case
520 elif not isinstance(e, SystemExit) or e.code != -2:
521 self.logger.warning("Abnormal termination:\n{}".format(
522 traceback.format_exc()))
526 # Stop the thread before doing anything else
527 self._stop_checkpointer.set()
528 self._checkpointer.join()
529 if self._checkpoint_before_quit:
530 # Commit all pending blocks & one last _update()
531 self._local_collection.manifest_text()
532 self._update(final=True)
534 self.save_collection()
536 self._cache_file.close()
538 def save_collection(self):
540 # Check if files should be updated on the remote collection.
541 for fp in self._file_paths:
542 remote_file = self._remote_collection.find(fp)
544 # File don't exist on remote collection, copy it.
545 self._remote_collection.copy(fp, fp, self._local_collection)
546 elif remote_file != self._local_collection.find(fp):
547 # A different file exist on remote collection, overwrite it.
548 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
550 # The file already exist on remote collection, skip it.
552 self._remote_collection.save(num_retries=self.num_retries)
554 self._local_collection.save_new(
555 name=self.name, owner_uuid=self.owner_uuid,
556 ensure_unique_name=self.ensure_unique_name,
557 num_retries=self.num_retries)
559 def destroy_cache(self):
562 os.unlink(self._cache_filename)
563 except OSError as error:
564 # That's what we wanted anyway.
565 if error.errno != errno.ENOENT:
567 self._cache_file.close()
569 def _collection_size(self, collection):
571 Recursively get the total size of the collection
574 for item in listvalues(collection):
575 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
576 size += self._collection_size(item)
581 def _update_task(self):
583 Periodically called support task. File uploading is
584 asynchronous so we poll status from the collection.
586 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
589 def _update(self, final=False):
591 Update cached manifest text and report progress.
593 if self._upload_started:
594 with self._collection_lock:
595 self.bytes_written = self._collection_size(self._local_collection)
598 manifest = self._local_collection.manifest_text()
600 # Get the manifest text without comitting pending blocks
601 manifest = self._local_collection.manifest_text(strip=False,
605 with self._state_lock:
606 self._state['manifest'] = manifest
610 except Exception as e:
611 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
613 self.bytes_written = self.bytes_skipped
614 # Call the reporter, if any
615 self.report_progress()
617 def report_progress(self):
618 if self.reporter is not None:
619 self.reporter(self.bytes_written, self.bytes_expected)
621 def _write_stdin(self, filename):
622 output = self._local_collection.open(filename, 'wb')
623 self._write(sys.stdin, output)
626 def _check_file(self, source, filename):
628 Check if this file needs to be uploaded
630 # Ignore symlinks when requested
631 if (not self.follow_links) and os.path.islink(source):
634 should_upload = False
635 new_file_in_cache = False
636 # Record file path for updating the remote collection before exiting
637 self._file_paths.add(filename)
639 with self._state_lock:
640 # If no previous cached data on this file, store it for an eventual
642 if source not in self._state['files']:
643 self._state['files'][source] = {
644 'mtime': os.path.getmtime(source),
645 'size' : os.path.getsize(source)
647 new_file_in_cache = True
648 cached_file_data = self._state['files'][source]
650 # Check if file was already uploaded (at least partially)
651 file_in_local_collection = self._local_collection.find(filename)
653 # If not resuming, upload the full file.
656 # New file detected from last run, upload it.
657 elif new_file_in_cache:
659 # Local file didn't change from last run.
660 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
661 if not file_in_local_collection:
662 # File not uploaded yet, upload it completely
664 elif file_in_local_collection.permission_expired():
665 # Permission token expired, re-upload file. This will change whenever
666 # we have a API for refreshing tokens.
668 self._local_collection.remove(filename)
669 elif cached_file_data['size'] == file_in_local_collection.size():
670 # File already there, skip it.
671 self.bytes_skipped += cached_file_data['size']
672 elif cached_file_data['size'] > file_in_local_collection.size():
673 # File partially uploaded, resume!
674 resume_offset = file_in_local_collection.size()
675 self.bytes_skipped += resume_offset
678 # Inconsistent cache, re-upload the file
680 self._local_collection.remove(filename)
681 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
682 # Local file differs from cached data, re-upload it.
684 if file_in_local_collection:
685 self._local_collection.remove(filename)
689 self._files_to_upload.append((source, resume_offset, filename))
691 def _upload_files(self):
692 for source, resume_offset, filename in self._files_to_upload:
693 with open(source, 'rb') as source_fd:
694 with self._state_lock:
695 self._state['files'][source]['mtime'] = os.path.getmtime(source)
696 self._state['files'][source]['size'] = os.path.getsize(source)
697 if resume_offset > 0:
698 # Start upload where we left off
699 output = self._local_collection.open(filename, 'ab')
700 source_fd.seek(resume_offset)
703 output = self._local_collection.open(filename, 'wb')
704 self._write(source_fd, output)
705 output.close(flush=False)
707 def _write(self, source_fd, output):
709 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
714 def _my_collection(self):
715 return self._remote_collection if self.update else self._local_collection
717 def _setup_state(self, update_collection):
719 Create a new cache file or load a previously existing one.
721 # Load an already existing collection for update
722 if update_collection and re.match(arvados.util.collection_uuid_pattern,
725 self._remote_collection = arvados.collection.Collection(update_collection)
726 except arvados.errors.ApiError as error:
727 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
730 elif update_collection:
731 # Collection locator provided, but unknown format
732 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
735 # Set up cache file name from input paths.
737 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
738 realpaths = sorted(os.path.realpath(path) for path in self.paths)
739 md5.update(b'\0'.join([p.encode() for p in realpaths]))
741 md5.update(self.filename.encode())
742 cache_filename = md5.hexdigest()
743 cache_filepath = os.path.join(
744 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
746 if self.resume and os.path.exists(cache_filepath):
747 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
748 self._cache_file = open(cache_filepath, 'a+')
750 # --no-resume means start with a empty cache file.
751 self.logger.info("Creating new cache file at {}".format(cache_filepath))
752 self._cache_file = open(cache_filepath, 'w+')
753 self._cache_filename = self._cache_file.name
754 self._lock_file(self._cache_file)
755 self._cache_file.seek(0)
757 with self._state_lock:
760 self._state = json.load(self._cache_file)
761 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
762 # Cache at least partially incomplete, set up new cache
763 self._state = copy.deepcopy(self.EMPTY_STATE)
765 # Cache file empty, set up new cache
766 self._state = copy.deepcopy(self.EMPTY_STATE)
768 self.logger.info("No cache usage requested for this run.")
769 # No cache file, set empty state
770 self._state = copy.deepcopy(self.EMPTY_STATE)
771 # Load the previous manifest so we can check if files were modified remotely.
772 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
774 def collection_file_paths(self, col, path_prefix='.'):
775 """Return a list of file paths by recursively go through the entire collection `col`"""
777 for name, item in listitems(col):
778 if isinstance(item, arvados.arvfile.ArvadosFile):
779 file_paths.append(os.path.join(path_prefix, name))
780 elif isinstance(item, arvados.collection.Subcollection):
781 new_prefix = os.path.join(path_prefix, name)
782 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
785 def _lock_file(self, fileobj):
787 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
789 raise ResumeCacheConflict("{} locked".format(fileobj.name))
791 def _save_state(self):
793 Atomically save current state into cache.
795 with self._state_lock:
796 # We're not using copy.deepcopy() here because it's a lot slower
797 # than json.dumps(), and we're already needing JSON format to be
799 state = json.dumps(self._state)
801 new_cache = tempfile.NamedTemporaryFile(
803 dir=os.path.dirname(self._cache_filename), delete=False)
804 self._lock_file(new_cache)
805 new_cache.write(state)
808 os.rename(new_cache.name, self._cache_filename)
809 except (IOError, OSError, ResumeCacheConflict) as error:
810 self.logger.error("There was a problem while saving the cache file: {}".format(error))
812 os.unlink(new_cache_name)
813 except NameError: # mkstemp failed.
816 self._cache_file.close()
817 self._cache_file = new_cache
819 def collection_name(self):
820 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
822 def manifest_locator(self):
823 return self._my_collection().manifest_locator()
825 def portable_data_hash(self):
826 pdh = self._my_collection().portable_data_hash()
827 m = self._my_collection().stripped_manifest().encode()
828 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
830 logger.warning("\n".join([
831 "arv-put: API server provided PDH differs from local manifest.",
832 " This should not happen; showing API server version."]))
835 def manifest_text(self, stream_name=".", strip=False, normalize=False):
836 return self._my_collection().manifest_text(stream_name, strip, normalize)
838 def _datablocks_on_item(self, item):
840 Return a list of datablock locators, recursively navigating
841 through subcollections
843 if isinstance(item, arvados.arvfile.ArvadosFile):
846 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
849 for segment in item.segments():
850 loc = segment.locator
853 elif isinstance(item, arvados.collection.Collection):
854 l = [self._datablocks_on_item(x) for x in listvalues(item)]
855 # Fast list flattener method taken from:
856 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
857 return [loc for sublist in l for loc in sublist]
861 def data_locators(self):
862 with self._collection_lock:
863 # Make sure all datablocks are flushed before getting the locators
864 self._my_collection().manifest_text()
865 datablocks = self._datablocks_on_item(self._my_collection())
868 def expected_bytes_for(pathlist, follow_links=True, exclude={}):
869 # Walk the given directory trees and stat files, adding up file sizes,
870 # so we can display progress as percent
872 exclude_paths = exclude.get('paths', None)
873 exclude_names = exclude.get('names', None)
874 for path in pathlist:
875 if os.path.isdir(path):
876 for root, dirs, files in os.walk(path, followlinks=follow_links):
877 root_relpath = os.path.relpath(root, path)
878 # Exclude files/dirs by full path matching pattern
879 if exclude_paths is not None:
881 lambda d: not any([pathname_match(os.path.join(root_relpath, d),
883 for pat in exclude_paths]),
886 lambda f: not any([pathname_match(os.path.join(root_relpath, f),
888 for pat in exclude_paths]),
890 # Exclude files/dirs by name matching pattern
891 if exclude_names is not None:
892 dirs[:] = filter(lambda d: not exclude_names.match(d), dirs)
893 files = filter(lambda f: not exclude_names.match(f), files)
896 filepath = os.path.join(root, f)
897 # Ignore symlinked files when requested
898 if (not follow_links) and os.path.islink(filepath):
900 bytesum += os.path.getsize(filepath)
901 elif not os.path.isfile(path):
904 bytesum += os.path.getsize(path)
907 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
910 # Simulate glob.glob() matching behavior without the need to scan the filesystem
911 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
912 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
913 # so instead we're using it on every path component.
914 def pathname_match(pathname, pattern):
915 name = pathname.split(os.sep)
916 pat = pattern.split(os.sep)
917 if len(name) != len(pat):
919 for i in range(len(name)):
920 if not fnmatch.fnmatch(name[i], pat[i]):
924 def machine_progress(bytes_written, bytes_expected):
925 return _machine_format.format(
926 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
928 def human_progress(bytes_written, bytes_expected):
930 return "\r{}M / {}M {:.1%} ".format(
931 bytes_written >> 20, bytes_expected >> 20,
932 float(bytes_written) / bytes_expected)
934 return "\r{} ".format(bytes_written)
936 def progress_writer(progress_func, outfile=sys.stderr):
937 def write_progress(bytes_written, bytes_expected):
938 outfile.write(progress_func(bytes_written, bytes_expected))
939 return write_progress
941 def exit_signal_handler(sigcode, frame):
944 def desired_project_uuid(api_client, project_uuid, num_retries):
946 query = api_client.users().current()
947 elif arvados.util.user_uuid_pattern.match(project_uuid):
948 query = api_client.users().get(uuid=project_uuid)
949 elif arvados.util.group_uuid_pattern.match(project_uuid):
950 query = api_client.groups().get(uuid=project_uuid)
952 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
953 return query.execute(num_retries=num_retries)['uuid']
955 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
958 logger = logging.getLogger('arvados.arv_put')
959 logger.setLevel(logging.INFO)
960 args = parse_arguments(arguments)
962 if api_client is None:
963 api_client = arvados.api('v1')
965 # Determine the name to use
967 if args.stream or args.raw:
968 logger.error("Cannot use --name with --stream or --raw")
970 elif args.update_collection:
971 logger.error("Cannot use --name with --update-collection")
973 collection_name = args.name
975 collection_name = "Saved at {} by {}@{}".format(
976 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
977 pwd.getpwuid(os.getuid()).pw_name,
978 socket.gethostname())
980 if args.project_uuid and (args.stream or args.raw):
981 logger.error("Cannot use --project-uuid with --stream or --raw")
984 # Determine the parent project
986 project_uuid = desired_project_uuid(api_client, args.project_uuid,
988 except (apiclient_errors.Error, ValueError) as error:
993 reporter = progress_writer(human_progress)
994 elif args.batch_progress:
995 reporter = progress_writer(machine_progress)
999 # Setup exclude regex from all the --exclude arguments provided
1002 exclude_names = None
1003 if len(args.exclude) > 0:
1004 # We're supporting 2 kinds of exclusion patterns:
1005 # 1) --exclude '*.jpg' (file/dir name patterns, will only match the name)
1006 # 2) --exclude 'foo/bar' (file/dir path patterns, will match the entire path,
1007 # and should be relative to any input dir argument)
1008 for p in args.exclude:
1009 # Only relative paths patterns allowed
1010 if p.startswith(os.sep):
1011 logger.error("Cannot use absolute paths with --exclude")
1013 if os.path.dirname(p):
1014 # Path search pattern
1015 exclude_paths.append(p)
1017 # Name-only search pattern
1018 name_patterns.append(p)
1019 # For name only matching, we can combine all patterns into a single regexp,
1020 # for better performance.
1021 exclude_names = re.compile('|'.join(
1022 [fnmatch.translate(p) for p in name_patterns]
1023 )) if len(name_patterns) > 0 else None
1024 # Show the user the patterns to be used, just in case they weren't specified inside
1025 # quotes and got changed by the shell expansion.
1026 logger.info("Exclude patterns: {}".format(args.exclude))
1028 # If this is used by a human, and there's at least one directory to be
1029 # uploaded, the expected bytes calculation can take a moment.
1030 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1031 logger.info("Calculating upload size, this could take some time...")
1032 bytes_expected = expected_bytes_for(args.paths,
1033 follow_links=args.follow_links,
1034 exclude={'paths': exclude_paths,
1035 'names': exclude_names})
1039 writer = ArvPutUploadJob(paths = args.paths,
1040 resume = args.resume,
1041 use_cache = args.use_cache,
1042 filename = args.filename,
1043 reporter = reporter,
1044 bytes_expected = bytes_expected,
1045 num_retries = args.retries,
1046 replication_desired = args.replication,
1047 put_threads = args.threads,
1048 name = collection_name,
1049 owner_uuid = project_uuid,
1050 ensure_unique_name = True,
1051 update_collection = args.update_collection,
1053 dry_run=args.dry_run,
1054 follow_links=args.follow_links,
1055 exclude_paths=exclude_paths,
1056 exclude_names=exclude_names)
1057 except ResumeCacheConflict:
1058 logger.error("\n".join([
1059 "arv-put: Another process is already uploading this data.",
1060 " Use --no-cache if this is really what you want."]))
1062 except CollectionUpdateError as error:
1063 logger.error("\n".join([
1064 "arv-put: %s" % str(error)]))
1066 except ArvPutUploadIsPending:
1067 # Dry run check successful, return proper exit code.
1069 except ArvPutUploadNotPending:
1070 # No files pending for upload
1073 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1075 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1076 for sigcode in CAUGHT_SIGNALS}
1078 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1079 logger.warning("\n".join([
1080 "arv-put: Resuming previous upload from last checkpoint.",
1081 " Use the --no-resume option to start over."]))
1083 if not args.dry_run:
1084 writer.report_progress()
1087 writer.start(save_collection=not(args.stream or args.raw))
1088 except arvados.errors.ApiError as error:
1089 logger.error("\n".join([
1090 "arv-put: %s" % str(error)]))
1092 except ArvPutUploadIsPending:
1093 # Dry run check successful, return proper exit code.
1095 except ArvPutUploadNotPending:
1096 # No files pending for upload
1098 except PathDoesNotExistError as error:
1099 logger.error("\n".join([
1100 "arv-put: %s" % str(error)]))
1103 if args.progress: # Print newline to split stderr from stdout for humans.
1108 output = writer.manifest_text(normalize=True)
1110 output = writer.manifest_text()
1112 output = ','.join(writer.data_locators())
1115 if args.update_collection:
1116 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1118 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1119 if args.portable_data_hash:
1120 output = writer.portable_data_hash()
1122 output = writer.manifest_locator()
1123 except apiclient_errors.Error as error:
1125 "arv-put: Error creating Collection on project: {}.".format(
1129 # Print the locator (uuid) of the new collection.
1131 status = status or 1
1133 stdout.write(output)
1134 if not output.endswith('\n'):
1137 for sigcode, orig_handler in listitems(orig_signal_handlers):
1138 signal.signal(sigcode, orig_handler)
1147 if __name__ == '__main__':