1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 run_opts = argparse.ArgumentParser(add_help=False)
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165 action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 You can specify multiple patterns by using this argument more than once.
174 _group = run_opts.add_mutually_exclusive_group()
175 _group.add_argument('--progress', action='store_true',
177 Display human-readable progress on stderr (bytes and, if possible,
178 percentage of total data size). This is the default behavior when
182 _group.add_argument('--no-progress', action='store_true',
184 Do not display human-readable progress on stderr, even if stderr is a
188 _group.add_argument('--batch-progress', action='store_true',
190 Display machine-readable progress on stderr (bytes and, if known,
194 _group = run_opts.add_mutually_exclusive_group()
195 _group.add_argument('--resume', action='store_true', default=True,
197 Continue interrupted uploads from cached state (default).
199 _group.add_argument('--no-resume', action='store_false', dest='resume',
201 Do not continue interrupted uploads from cached state.
204 _group = run_opts.add_mutually_exclusive_group()
205 _group.add_argument('--follow-links', action='store_true', default=True,
206 dest='follow_links', help="""
207 Follow file and directory symlinks (default).
209 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
211 Do not follow file and directory symlinks.
214 _group = run_opts.add_mutually_exclusive_group()
215 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
217 Save upload state in a cache file for resuming (default).
219 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
221 Do not save upload state in a cache file for resuming.
224 arg_parser = argparse.ArgumentParser(
225 description='Copy data from the local filesystem to Keep.',
226 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
228 def parse_arguments(arguments):
229 args = arg_parser.parse_args(arguments)
231 if len(args.paths) == 0:
234 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
236 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
239 --filename argument cannot be used when storing a directory or
243 # Turn on --progress by default if stderr is a tty.
244 if (not (args.batch_progress or args.no_progress)
245 and os.isatty(sys.stderr.fileno())):
248 # Turn off --resume (default) if --no-cache is used.
249 if not args.use_cache:
252 if args.paths == ['-']:
253 if args.update_collection:
255 --update-collection cannot be used when reading from stdin.
258 args.use_cache = False
259 if not args.filename:
260 args.filename = 'stdin'
262 # Remove possible duplicated patterns
263 if len(args.exclude) > 0:
264 args.exclude = list(set(args.exclude))
269 class PathDoesNotExistError(Exception):
273 class CollectionUpdateError(Exception):
277 class ResumeCacheConflict(Exception):
281 class ArvPutArgumentConflict(Exception):
285 class ArvPutUploadIsPending(Exception):
289 class ArvPutUploadNotPending(Exception):
293 class FileUploadList(list):
294 def __init__(self, dry_run=False):
296 self.dry_run = dry_run
298 def append(self, other):
300 raise ArvPutUploadIsPending()
301 super(FileUploadList, self).append(other)
304 class ResumeCache(object):
305 CACHE_DIR = '.cache/arvados/arv-put'
307 def __init__(self, file_spec):
308 self.cache_file = open(file_spec, 'a+')
309 self._lock_file(self.cache_file)
310 self.filename = self.cache_file.name
313 def make_path(cls, args):
315 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
316 realpaths = sorted(os.path.realpath(path) for path in args.paths)
317 md5.update(b'\0'.join([p.encode() for p in realpaths]))
318 if any(os.path.isdir(path) for path in realpaths):
321 md5.update(args.filename.encode())
323 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
326 def _lock_file(self, fileobj):
328 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
330 raise ResumeCacheConflict("{} locked".format(fileobj.name))
333 self.cache_file.seek(0)
334 return json.load(self.cache_file)
336 def check_cache(self, api_client=None, num_retries=0):
341 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
342 locator = state["_finished_streams"][0][1][0]
343 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
344 locator = state["_current_stream_locators"][0]
345 if locator is not None:
346 kc = arvados.keep.KeepClient(api_client=api_client)
347 kc.head(locator, num_retries=num_retries)
348 except Exception as e:
353 def save(self, data):
355 new_cache_fd, new_cache_name = tempfile.mkstemp(
356 dir=os.path.dirname(self.filename))
357 self._lock_file(new_cache_fd)
358 new_cache = os.fdopen(new_cache_fd, 'r+')
359 json.dump(data, new_cache)
360 os.rename(new_cache_name, self.filename)
361 except (IOError, OSError, ResumeCacheConflict) as error:
363 os.unlink(new_cache_name)
364 except NameError: # mkstemp failed.
367 self.cache_file.close()
368 self.cache_file = new_cache
371 self.cache_file.close()
375 os.unlink(self.filename)
376 except OSError as error:
377 if error.errno != errno.ENOENT: # That's what we wanted anyway.
383 self.__init__(self.filename)
386 class ArvPutUploadJob(object):
387 CACHE_DIR = '.cache/arvados/arv-put'
389 'manifest' : None, # Last saved manifest checkpoint
390 'files' : {} # Previous run file list: {path : {size, mtime}}
393 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
394 name=None, owner_uuid=None,
395 ensure_unique_name=False, num_retries=None,
396 put_threads=None, replication_desired=None,
397 filename=None, update_time=60.0, update_collection=None,
398 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
399 follow_links=True, exclude_paths=[], exclude_names=None):
402 self.use_cache = use_cache
404 self.reporter = reporter
405 # This will set to 0 before start counting, if no special files are going
407 self.bytes_expected = None
408 self.bytes_written = 0
409 self.bytes_skipped = 0
411 self.owner_uuid = owner_uuid
412 self.ensure_unique_name = ensure_unique_name
413 self.num_retries = num_retries
414 self.replication_desired = replication_desired
415 self.put_threads = put_threads
416 self.filename = filename
417 self._state_lock = threading.Lock()
418 self._state = None # Previous run state (file list & manifest)
419 self._current_files = [] # Current run file list
420 self._cache_file = None
421 self._collection_lock = threading.Lock()
422 self._remote_collection = None # Collection being updated (if asked)
423 self._local_collection = None # Collection from previous run manifest
424 self._file_paths = set() # Files to be updated in remote collection
425 self._stop_checkpointer = threading.Event()
426 self._checkpointer = threading.Thread(target=self._update_task)
427 self._checkpointer.daemon = True
428 self._update_task_time = update_time # How many seconds wait between update runs
429 self._files_to_upload = FileUploadList(dry_run=dry_run)
430 self._upload_started = False
432 self.dry_run = dry_run
433 self._checkpoint_before_quit = True
434 self.follow_links = follow_links
435 self.exclude_paths = exclude_paths
436 self.exclude_names = exclude_names
438 if not self.use_cache and self.resume:
439 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
441 # Check for obvious dry-run responses
442 if self.dry_run and (not self.use_cache or not self.resume):
443 raise ArvPutUploadIsPending()
445 # Load cached data if any and if needed
446 self._setup_state(update_collection)
448 # Build the upload file list, excluding requested files and counting the
449 # bytes expected to be uploaded.
450 self._build_upload_list()
452 def _build_upload_list(self):
454 Scan the requested paths to count file sizes, excluding files & dirs if requested
455 and building the upload file list.
457 # If there aren't special files to be read, reset total bytes count to zero
459 if not any([p for p in self.paths
460 if not (os.path.isfile(p) or os.path.isdir(p))]):
461 self.bytes_expected = 0
463 for path in self.paths:
464 # Test for stdin first, in case some file named '-' exist
467 raise ArvPutUploadIsPending()
468 self._write_stdin(self.filename or 'stdin')
469 elif not os.path.exists(path):
470 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
471 elif os.path.isdir(path):
472 # Use absolute paths on cache index so CWD doesn't interfere
473 # with the caching logic.
475 path = os.path.abspath(path)
476 if orig_path[-1:] == os.sep:
477 # When passing a directory reference with a trailing slash,
478 # its contents should be uploaded directly to the
482 # When passing a directory reference with no trailing slash,
483 # upload the directory to the collection's root.
484 prefixdir = os.path.dirname(path)
486 for root, dirs, files in os.walk(path,
487 followlinks=self.follow_links):
488 root_relpath = os.path.relpath(root, path)
489 if root_relpath == '.':
491 # Exclude files/dirs by full path matching pattern
492 if self.exclude_paths:
493 dirs[:] = [d for d in dirs
494 if not any(pathname_match(
495 os.path.join(root_relpath, d), pat)
496 for pat in self.exclude_paths)]
497 files = [f for f in files
498 if not any(pathname_match(
499 os.path.join(root_relpath, f), pat)
500 for pat in self.exclude_paths)]
501 # Exclude files/dirs by name matching pattern
502 if self.exclude_names is not None:
503 dirs[:] = [d for d in dirs
504 if not self.exclude_names.match(d)]
505 files = [f for f in files
506 if not self.exclude_names.match(f)]
507 # Make os.walk()'s dir traversing order deterministic
511 filepath = os.path.join(root, f)
512 # Add its size to the total bytes count (if applicable)
513 if self.follow_links or (not os.path.islink(filepath)):
514 if self.bytes_expected is not None:
515 self.bytes_expected += os.path.getsize(filepath)
516 self._check_file(filepath,
517 os.path.join(root[len(prefixdir):], f))
519 filepath = os.path.abspath(path)
520 # Add its size to the total bytes count (if applicable)
521 if self.follow_links or (not os.path.islink(filepath)):
522 if self.bytes_expected is not None:
523 self.bytes_expected += os.path.getsize(filepath)
524 self._check_file(filepath,
525 self.filename or os.path.basename(path))
526 # If dry-mode is on, and got up to this point, then we should notify that
527 # there aren't any file to upload.
529 raise ArvPutUploadNotPending()
530 # Remove local_collection's files that don't exist locally anymore, so the
531 # bytes_written count is correct.
532 for f in self.collection_file_paths(self._local_collection,
534 if f != 'stdin' and f != self.filename and not f in self._file_paths:
535 self._local_collection.remove(f)
537 def start(self, save_collection):
539 Start supporting thread & file uploading
541 self._checkpointer.start()
543 # Update bytes_written from current local collection and
544 # report initial progress.
547 self._upload_started = True # Used by the update thread to start checkpointing
549 except (SystemExit, Exception) as e:
550 self._checkpoint_before_quit = False
551 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
552 # Note: We're expecting SystemExit instead of
553 # KeyboardInterrupt because we have a custom signal
554 # handler in place that raises SystemExit with the catched
556 if isinstance(e, PathDoesNotExistError):
557 # We aren't interested in the traceback for this case
559 elif not isinstance(e, SystemExit) or e.code != -2:
560 self.logger.warning("Abnormal termination:\n{}".format(
561 traceback.format_exc()))
565 # Stop the thread before doing anything else
566 self._stop_checkpointer.set()
567 self._checkpointer.join()
568 if self._checkpoint_before_quit:
569 # Commit all pending blocks & one last _update()
570 self._local_collection.manifest_text()
571 self._update(final=True)
573 self.save_collection()
575 self._cache_file.close()
577 def save_collection(self):
579 # Check if files should be updated on the remote collection.
580 for fp in self._file_paths:
581 remote_file = self._remote_collection.find(fp)
583 # File don't exist on remote collection, copy it.
584 self._remote_collection.copy(fp, fp, self._local_collection)
585 elif remote_file != self._local_collection.find(fp):
586 # A different file exist on remote collection, overwrite it.
587 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
589 # The file already exist on remote collection, skip it.
591 self._remote_collection.save(num_retries=self.num_retries)
593 self._local_collection.save_new(
594 name=self.name, owner_uuid=self.owner_uuid,
595 ensure_unique_name=self.ensure_unique_name,
596 num_retries=self.num_retries)
598 def destroy_cache(self):
601 os.unlink(self._cache_filename)
602 except OSError as error:
603 # That's what we wanted anyway.
604 if error.errno != errno.ENOENT:
606 self._cache_file.close()
608 def _collection_size(self, collection):
610 Recursively get the total size of the collection
613 for item in listvalues(collection):
614 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
615 size += self._collection_size(item)
620 def _update_task(self):
622 Periodically called support task. File uploading is
623 asynchronous so we poll status from the collection.
625 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
628 def _update(self, final=False):
630 Update cached manifest text and report progress.
632 if self._upload_started:
633 with self._collection_lock:
634 self.bytes_written = self._collection_size(self._local_collection)
637 manifest = self._local_collection.manifest_text()
639 # Get the manifest text without comitting pending blocks
640 manifest = self._local_collection.manifest_text(strip=False,
644 with self._state_lock:
645 self._state['manifest'] = manifest
649 except Exception as e:
650 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
652 self.bytes_written = self.bytes_skipped
653 # Call the reporter, if any
654 self.report_progress()
656 def report_progress(self):
657 if self.reporter is not None:
658 self.reporter(self.bytes_written, self.bytes_expected)
660 def _write_stdin(self, filename):
661 output = self._local_collection.open(filename, 'wb')
662 self._write(sys.stdin, output)
665 def _check_file(self, source, filename):
667 Check if this file needs to be uploaded
669 # Ignore symlinks when requested
670 if (not self.follow_links) and os.path.islink(source):
673 should_upload = False
674 new_file_in_cache = False
675 # Record file path for updating the remote collection before exiting
676 self._file_paths.add(filename)
678 with self._state_lock:
679 # If no previous cached data on this file, store it for an eventual
681 if source not in self._state['files']:
682 self._state['files'][source] = {
683 'mtime': os.path.getmtime(source),
684 'size' : os.path.getsize(source)
686 new_file_in_cache = True
687 cached_file_data = self._state['files'][source]
689 # Check if file was already uploaded (at least partially)
690 file_in_local_collection = self._local_collection.find(filename)
692 # If not resuming, upload the full file.
695 # New file detected from last run, upload it.
696 elif new_file_in_cache:
698 # Local file didn't change from last run.
699 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
700 if not file_in_local_collection:
701 # File not uploaded yet, upload it completely
703 elif file_in_local_collection.permission_expired():
704 # Permission token expired, re-upload file. This will change whenever
705 # we have a API for refreshing tokens.
707 self._local_collection.remove(filename)
708 elif cached_file_data['size'] == file_in_local_collection.size():
709 # File already there, skip it.
710 self.bytes_skipped += cached_file_data['size']
711 elif cached_file_data['size'] > file_in_local_collection.size():
712 # File partially uploaded, resume!
713 resume_offset = file_in_local_collection.size()
714 self.bytes_skipped += resume_offset
717 # Inconsistent cache, re-upload the file
719 self._local_collection.remove(filename)
720 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
721 # Local file differs from cached data, re-upload it.
723 if file_in_local_collection:
724 self._local_collection.remove(filename)
729 self._files_to_upload.append((source, resume_offset, filename))
730 except ArvPutUploadIsPending:
731 # This could happen when running on dry-mode, close cache file to
732 # avoid locking issues.
733 self._cache_file.close()
736 def _upload_files(self):
737 for source, resume_offset, filename in self._files_to_upload:
738 with open(source, 'rb') as source_fd:
739 with self._state_lock:
740 self._state['files'][source]['mtime'] = os.path.getmtime(source)
741 self._state['files'][source]['size'] = os.path.getsize(source)
742 if resume_offset > 0:
743 # Start upload where we left off
744 output = self._local_collection.open(filename, 'ab')
745 source_fd.seek(resume_offset)
748 output = self._local_collection.open(filename, 'wb')
749 self._write(source_fd, output)
750 output.close(flush=False)
752 def _write(self, source_fd, output):
754 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
759 def _my_collection(self):
760 return self._remote_collection if self.update else self._local_collection
762 def _setup_state(self, update_collection):
764 Create a new cache file or load a previously existing one.
766 # Load an already existing collection for update
767 if update_collection and re.match(arvados.util.collection_uuid_pattern,
770 self._remote_collection = arvados.collection.Collection(update_collection)
771 except arvados.errors.ApiError as error:
772 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
775 elif update_collection:
776 # Collection locator provided, but unknown format
777 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
780 # Set up cache file name from input paths.
782 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
783 realpaths = sorted(os.path.realpath(path) for path in self.paths)
784 md5.update(b'\0'.join([p.encode() for p in realpaths]))
786 md5.update(self.filename.encode())
787 cache_filename = md5.hexdigest()
788 cache_filepath = os.path.join(
789 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
791 if self.resume and os.path.exists(cache_filepath):
792 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
793 self._cache_file = open(cache_filepath, 'a+')
795 # --no-resume means start with a empty cache file.
796 self.logger.info("Creating new cache file at {}".format(cache_filepath))
797 self._cache_file = open(cache_filepath, 'w+')
798 self._cache_filename = self._cache_file.name
799 self._lock_file(self._cache_file)
800 self._cache_file.seek(0)
802 with self._state_lock:
805 self._state = json.load(self._cache_file)
806 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
807 # Cache at least partially incomplete, set up new cache
808 self._state = copy.deepcopy(self.EMPTY_STATE)
810 # Cache file empty, set up new cache
811 self._state = copy.deepcopy(self.EMPTY_STATE)
813 self.logger.info("No cache usage requested for this run.")
814 # No cache file, set empty state
815 self._state = copy.deepcopy(self.EMPTY_STATE)
816 # Load the previous manifest so we can check if files were modified remotely.
817 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
819 def collection_file_paths(self, col, path_prefix='.'):
820 """Return a list of file paths by recursively go through the entire collection `col`"""
822 for name, item in listitems(col):
823 if isinstance(item, arvados.arvfile.ArvadosFile):
824 file_paths.append(os.path.join(path_prefix, name))
825 elif isinstance(item, arvados.collection.Subcollection):
826 new_prefix = os.path.join(path_prefix, name)
827 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
830 def _lock_file(self, fileobj):
832 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
834 raise ResumeCacheConflict("{} locked".format(fileobj.name))
836 def _save_state(self):
838 Atomically save current state into cache.
840 with self._state_lock:
841 # We're not using copy.deepcopy() here because it's a lot slower
842 # than json.dumps(), and we're already needing JSON format to be
844 state = json.dumps(self._state)
846 new_cache = tempfile.NamedTemporaryFile(
848 dir=os.path.dirname(self._cache_filename), delete=False)
849 self._lock_file(new_cache)
850 new_cache.write(state)
853 os.rename(new_cache.name, self._cache_filename)
854 except (IOError, OSError, ResumeCacheConflict) as error:
855 self.logger.error("There was a problem while saving the cache file: {}".format(error))
857 os.unlink(new_cache_name)
858 except NameError: # mkstemp failed.
861 self._cache_file.close()
862 self._cache_file = new_cache
864 def collection_name(self):
865 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
867 def manifest_locator(self):
868 return self._my_collection().manifest_locator()
870 def portable_data_hash(self):
871 pdh = self._my_collection().portable_data_hash()
872 m = self._my_collection().stripped_manifest().encode()
873 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
875 logger.warning("\n".join([
876 "arv-put: API server provided PDH differs from local manifest.",
877 " This should not happen; showing API server version."]))
880 def manifest_text(self, stream_name=".", strip=False, normalize=False):
881 return self._my_collection().manifest_text(stream_name, strip, normalize)
883 def _datablocks_on_item(self, item):
885 Return a list of datablock locators, recursively navigating
886 through subcollections
888 if isinstance(item, arvados.arvfile.ArvadosFile):
891 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
894 for segment in item.segments():
895 loc = segment.locator
898 elif isinstance(item, arvados.collection.Collection):
899 l = [self._datablocks_on_item(x) for x in listvalues(item)]
900 # Fast list flattener method taken from:
901 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
902 return [loc for sublist in l for loc in sublist]
906 def data_locators(self):
907 with self._collection_lock:
908 # Make sure all datablocks are flushed before getting the locators
909 self._my_collection().manifest_text()
910 datablocks = self._datablocks_on_item(self._my_collection())
913 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
916 # Simulate glob.glob() matching behavior without the need to scan the filesystem
917 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
918 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
919 # so instead we're using it on every path component.
920 def pathname_match(pathname, pattern):
921 name = pathname.split(os.sep)
922 # Fix patterns like 'some/subdir/' or 'some//subdir'
923 pat = [x for x in pattern.split(os.sep) if x != '']
924 if len(name) != len(pat):
926 for i in range(len(name)):
927 if not fnmatch.fnmatch(name[i], pat[i]):
931 def machine_progress(bytes_written, bytes_expected):
932 return _machine_format.format(
933 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
935 def human_progress(bytes_written, bytes_expected):
937 return "\r{}M / {}M {:.1%} ".format(
938 bytes_written >> 20, bytes_expected >> 20,
939 float(bytes_written) / bytes_expected)
941 return "\r{} ".format(bytes_written)
943 def progress_writer(progress_func, outfile=sys.stderr):
944 def write_progress(bytes_written, bytes_expected):
945 outfile.write(progress_func(bytes_written, bytes_expected))
946 return write_progress
948 def exit_signal_handler(sigcode, frame):
951 def desired_project_uuid(api_client, project_uuid, num_retries):
953 query = api_client.users().current()
954 elif arvados.util.user_uuid_pattern.match(project_uuid):
955 query = api_client.users().get(uuid=project_uuid)
956 elif arvados.util.group_uuid_pattern.match(project_uuid):
957 query = api_client.groups().get(uuid=project_uuid)
959 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
960 return query.execute(num_retries=num_retries)['uuid']
962 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
965 logger = logging.getLogger('arvados.arv_put')
966 logger.setLevel(logging.INFO)
967 args = parse_arguments(arguments)
969 if api_client is None:
970 api_client = arvados.api('v1')
972 # Determine the name to use
974 if args.stream or args.raw:
975 logger.error("Cannot use --name with --stream or --raw")
977 elif args.update_collection:
978 logger.error("Cannot use --name with --update-collection")
980 collection_name = args.name
982 collection_name = "Saved at {} by {}@{}".format(
983 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
984 pwd.getpwuid(os.getuid()).pw_name,
985 socket.gethostname())
987 if args.project_uuid and (args.stream or args.raw):
988 logger.error("Cannot use --project-uuid with --stream or --raw")
991 # Determine the parent project
993 project_uuid = desired_project_uuid(api_client, args.project_uuid,
995 except (apiclient_errors.Error, ValueError) as error:
1000 reporter = progress_writer(human_progress)
1001 elif args.batch_progress:
1002 reporter = progress_writer(machine_progress)
1006 # Setup exclude regex from all the --exclude arguments provided
1009 exclude_names = None
1010 if len(args.exclude) > 0:
1011 # We're supporting 2 kinds of exclusion patterns:
1012 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1014 # 2) --exclude 'foo/bar' (file/dir path patterns, will match the
1015 # entire path, and should be relative to
1016 # any input dir argument)
1017 for p in args.exclude:
1018 # Only relative paths patterns allowed
1019 if p.startswith(os.sep):
1020 logger.error("Cannot use absolute paths with --exclude")
1022 if os.path.dirname(p):
1023 # We don't support of path patterns with '.' or '..'
1024 p_parts = p.split(os.sep)
1025 if '.' in p_parts or '..' in p_parts:
1027 "Cannot use path patterns that include '.' or '..'")
1029 # Path search pattern
1030 exclude_paths.append(p)
1032 # Name-only search pattern
1033 name_patterns.append(p)
1034 # For name only matching, we can combine all patterns into a single regexp,
1035 # for better performance.
1036 exclude_names = re.compile('|'.join(
1037 [fnmatch.translate(p) for p in name_patterns]
1038 )) if len(name_patterns) > 0 else None
1039 # Show the user the patterns to be used, just in case they weren't specified inside
1040 # quotes and got changed by the shell expansion.
1041 logger.info("Exclude patterns: {}".format(args.exclude))
1043 # If this is used by a human, and there's at least one directory to be
1044 # uploaded, the expected bytes calculation can take a moment.
1045 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1046 logger.info("Calculating upload size, this could take some time...")
1048 writer = ArvPutUploadJob(paths = args.paths,
1049 resume = args.resume,
1050 use_cache = args.use_cache,
1051 filename = args.filename,
1052 reporter = reporter,
1053 num_retries = args.retries,
1054 replication_desired = args.replication,
1055 put_threads = args.threads,
1056 name = collection_name,
1057 owner_uuid = project_uuid,
1058 ensure_unique_name = True,
1059 update_collection = args.update_collection,
1061 dry_run=args.dry_run,
1062 follow_links=args.follow_links,
1063 exclude_paths=exclude_paths,
1064 exclude_names=exclude_names)
1065 except ResumeCacheConflict:
1066 logger.error("\n".join([
1067 "arv-put: Another process is already uploading this data.",
1068 " Use --no-cache if this is really what you want."]))
1070 except CollectionUpdateError as error:
1071 logger.error("\n".join([
1072 "arv-put: %s" % str(error)]))
1074 except ArvPutUploadIsPending:
1075 # Dry run check successful, return proper exit code.
1077 except ArvPutUploadNotPending:
1078 # No files pending for upload
1080 except PathDoesNotExistError as error:
1081 logger.error("\n".join([
1082 "arv-put: %s" % str(error)]))
1085 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1087 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1088 for sigcode in CAUGHT_SIGNALS}
1090 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1091 logger.warning("\n".join([
1092 "arv-put: Resuming previous upload from last checkpoint.",
1093 " Use the --no-resume option to start over."]))
1095 if not args.dry_run:
1096 writer.report_progress()
1099 writer.start(save_collection=not(args.stream or args.raw))
1100 except arvados.errors.ApiError as error:
1101 logger.error("\n".join([
1102 "arv-put: %s" % str(error)]))
1105 if args.progress: # Print newline to split stderr from stdout for humans.
1110 output = writer.manifest_text(normalize=True)
1112 output = writer.manifest_text()
1114 output = ','.join(writer.data_locators())
1117 if args.update_collection:
1118 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1120 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1121 if args.portable_data_hash:
1122 output = writer.portable_data_hash()
1124 output = writer.manifest_locator()
1125 except apiclient_errors.Error as error:
1127 "arv-put: Error creating Collection on project: {}.".format(
1131 # Print the locator (uuid) of the new collection.
1133 status = status or 1
1135 stdout.write(output)
1136 if not output.endswith('\n'):
1139 for sigcode, orig_handler in listitems(orig_signal_handlers):
1140 signal.signal(sigcode, orig_handler)
1149 if __name__ == '__main__':