1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 run_opts = argparse.ArgumentParser(add_help=False)
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165 action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
184 _group.add_argument('--no-progress', action='store_true',
186 Do not display human-readable progress on stderr, even if stderr is a
190 _group.add_argument('--batch-progress', action='store_true',
192 Display machine-readable progress on stderr (bytes and, if known,
196 run_opts.add_argument('--silent', action='store_true',
198 Do not print any debug messages to console. (Any error messages will
202 _group = run_opts.add_mutually_exclusive_group()
203 _group.add_argument('--resume', action='store_true', default=True,
205 Continue interrupted uploads from cached state (default).
207 _group.add_argument('--no-resume', action='store_false', dest='resume',
209 Do not continue interrupted uploads from cached state.
212 _group = run_opts.add_mutually_exclusive_group()
213 _group.add_argument('--follow-links', action='store_true', default=True,
214 dest='follow_links', help="""
215 Follow file and directory symlinks (default).
217 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
219 Do not follow file and directory symlinks.
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
225 Save upload state in a cache file for resuming (default).
227 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
229 Do not save upload state in a cache file for resuming.
232 arg_parser = argparse.ArgumentParser(
233 description='Copy data from the local filesystem to Keep.',
234 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
236 def parse_arguments(arguments):
237 args = arg_parser.parse_args(arguments)
239 if len(args.paths) == 0:
242 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
244 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
247 --filename argument cannot be used when storing a directory or
251 # Turn on --progress by default if stderr is a tty.
252 if (not (args.batch_progress or args.no_progress or args.silent)
253 and os.isatty(sys.stderr.fileno())):
256 # Turn off --resume (default) if --no-cache is used.
257 if not args.use_cache:
260 if args.paths == ['-']:
261 if args.update_collection:
263 --update-collection cannot be used when reading from stdin.
266 args.use_cache = False
267 if not args.filename:
268 args.filename = 'stdin'
270 # Remove possible duplicated patterns
271 if len(args.exclude) > 0:
272 args.exclude = list(set(args.exclude))
277 class PathDoesNotExistError(Exception):
281 class CollectionUpdateError(Exception):
285 class ResumeCacheConflict(Exception):
289 class ArvPutArgumentConflict(Exception):
293 class ArvPutUploadIsPending(Exception):
297 class ArvPutUploadNotPending(Exception):
301 class FileUploadList(list):
302 def __init__(self, dry_run=False):
304 self.dry_run = dry_run
306 def append(self, other):
308 raise ArvPutUploadIsPending()
309 super(FileUploadList, self).append(other)
312 class ResumeCache(object):
313 CACHE_DIR = '.cache/arvados/arv-put'
315 def __init__(self, file_spec):
316 self.cache_file = open(file_spec, 'a+')
317 self._lock_file(self.cache_file)
318 self.filename = self.cache_file.name
321 def make_path(cls, args):
323 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
324 realpaths = sorted(os.path.realpath(path) for path in args.paths)
325 md5.update(b'\0'.join([p.encode() for p in realpaths]))
326 if any(os.path.isdir(path) for path in realpaths):
329 md5.update(args.filename.encode())
331 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
334 def _lock_file(self, fileobj):
336 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
338 raise ResumeCacheConflict("{} locked".format(fileobj.name))
341 self.cache_file.seek(0)
342 return json.load(self.cache_file)
344 def check_cache(self, api_client=None, num_retries=0):
349 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
350 locator = state["_finished_streams"][0][1][0]
351 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
352 locator = state["_current_stream_locators"][0]
353 if locator is not None:
354 kc = arvados.keep.KeepClient(api_client=api_client)
355 kc.head(locator, num_retries=num_retries)
356 except Exception as e:
361 def save(self, data):
363 new_cache_fd, new_cache_name = tempfile.mkstemp(
364 dir=os.path.dirname(self.filename))
365 self._lock_file(new_cache_fd)
366 new_cache = os.fdopen(new_cache_fd, 'r+')
367 json.dump(data, new_cache)
368 os.rename(new_cache_name, self.filename)
369 except (IOError, OSError, ResumeCacheConflict) as error:
371 os.unlink(new_cache_name)
372 except NameError: # mkstemp failed.
375 self.cache_file.close()
376 self.cache_file = new_cache
379 self.cache_file.close()
383 os.unlink(self.filename)
384 except OSError as error:
385 if error.errno != errno.ENOENT: # That's what we wanted anyway.
391 self.__init__(self.filename)
394 class ArvPutUploadJob(object):
395 CACHE_DIR = '.cache/arvados/arv-put'
397 'manifest' : None, # Last saved manifest checkpoint
398 'files' : {} # Previous run file list: {path : {size, mtime}}
401 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
402 name=None, owner_uuid=None, api_client=None,
403 ensure_unique_name=False, num_retries=None,
404 put_threads=None, replication_desired=None,
405 filename=None, update_time=60.0, update_collection=None,
406 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
407 follow_links=True, exclude_paths=[], exclude_names=None):
410 self.use_cache = use_cache
412 self.reporter = reporter
413 # This will set to 0 before start counting, if no special files are going
415 self.bytes_expected = None
416 self.bytes_written = 0
417 self.bytes_skipped = 0
419 self.owner_uuid = owner_uuid
420 self.ensure_unique_name = ensure_unique_name
421 self.num_retries = num_retries
422 self.replication_desired = replication_desired
423 self.put_threads = put_threads
424 self.filename = filename
425 self._api_client = api_client
426 self._state_lock = threading.Lock()
427 self._state = None # Previous run state (file list & manifest)
428 self._current_files = [] # Current run file list
429 self._cache_file = None
430 self._collection_lock = threading.Lock()
431 self._remote_collection = None # Collection being updated (if asked)
432 self._local_collection = None # Collection from previous run manifest
433 self._file_paths = set() # Files to be updated in remote collection
434 self._stop_checkpointer = threading.Event()
435 self._checkpointer = threading.Thread(target=self._update_task)
436 self._checkpointer.daemon = True
437 self._update_task_time = update_time # How many seconds wait between update runs
438 self._files_to_upload = FileUploadList(dry_run=dry_run)
439 self._upload_started = False
441 self.dry_run = dry_run
442 self._checkpoint_before_quit = True
443 self.follow_links = follow_links
444 self.exclude_paths = exclude_paths
445 self.exclude_names = exclude_names
447 if not self.use_cache and self.resume:
448 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
450 # Check for obvious dry-run responses
451 if self.dry_run and (not self.use_cache or not self.resume):
452 raise ArvPutUploadIsPending()
454 # Load cached data if any and if needed
455 self._setup_state(update_collection)
457 # Build the upload file list, excluding requested files and counting the
458 # bytes expected to be uploaded.
459 self._build_upload_list()
461 def _build_upload_list(self):
463 Scan the requested paths to count file sizes, excluding files & dirs if requested
464 and building the upload file list.
466 # If there aren't special files to be read, reset total bytes count to zero
468 if not any([p for p in self.paths
469 if not (os.path.isfile(p) or os.path.isdir(p))]):
470 self.bytes_expected = 0
472 for path in self.paths:
473 # Test for stdin first, in case some file named '-' exist
476 raise ArvPutUploadIsPending()
477 self._write_stdin(self.filename or 'stdin')
478 elif not os.path.exists(path):
479 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
480 elif os.path.isdir(path):
481 # Use absolute paths on cache index so CWD doesn't interfere
482 # with the caching logic.
484 path = os.path.abspath(path)
485 if orig_path[-1:] == os.sep:
486 # When passing a directory reference with a trailing slash,
487 # its contents should be uploaded directly to the
491 # When passing a directory reference with no trailing slash,
492 # upload the directory to the collection's root.
493 prefixdir = os.path.dirname(path)
495 for root, dirs, files in os.walk(path,
496 followlinks=self.follow_links):
497 root_relpath = os.path.relpath(root, path)
498 if root_relpath == '.':
500 # Exclude files/dirs by full path matching pattern
501 if self.exclude_paths:
502 dirs[:] = [d for d in dirs
503 if not any(pathname_match(
504 os.path.join(root_relpath, d), pat)
505 for pat in self.exclude_paths)]
506 files = [f for f in files
507 if not any(pathname_match(
508 os.path.join(root_relpath, f), pat)
509 for pat in self.exclude_paths)]
510 # Exclude files/dirs by name matching pattern
511 if self.exclude_names is not None:
512 dirs[:] = [d for d in dirs
513 if not self.exclude_names.match(d)]
514 files = [f for f in files
515 if not self.exclude_names.match(f)]
516 # Make os.walk()'s dir traversing order deterministic
520 filepath = os.path.join(root, f)
521 # Add its size to the total bytes count (if applicable)
522 if self.follow_links or (not os.path.islink(filepath)):
523 if self.bytes_expected is not None:
524 self.bytes_expected += os.path.getsize(filepath)
525 self._check_file(filepath,
526 os.path.join(root[len(prefixdir):], f))
528 filepath = os.path.abspath(path)
529 # Add its size to the total bytes count (if applicable)
530 if self.follow_links or (not os.path.islink(filepath)):
531 if self.bytes_expected is not None:
532 self.bytes_expected += os.path.getsize(filepath)
533 self._check_file(filepath,
534 self.filename or os.path.basename(path))
535 # If dry-mode is on, and got up to this point, then we should notify that
536 # there aren't any file to upload.
538 raise ArvPutUploadNotPending()
539 # Remove local_collection's files that don't exist locally anymore, so the
540 # bytes_written count is correct.
541 for f in self.collection_file_paths(self._local_collection,
543 if f != 'stdin' and f != self.filename and not f in self._file_paths:
544 self._local_collection.remove(f)
546 def start(self, save_collection):
548 Start supporting thread & file uploading
550 self._checkpointer.start()
552 # Update bytes_written from current local collection and
553 # report initial progress.
556 self._upload_started = True # Used by the update thread to start checkpointing
558 except (SystemExit, Exception) as e:
559 self._checkpoint_before_quit = False
560 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
561 # Note: We're expecting SystemExit instead of
562 # KeyboardInterrupt because we have a custom signal
563 # handler in place that raises SystemExit with the catched
565 if isinstance(e, PathDoesNotExistError):
566 # We aren't interested in the traceback for this case
568 elif not isinstance(e, SystemExit) or e.code != -2:
569 self.logger.warning("Abnormal termination:\n{}".format(
570 traceback.format_exc()))
574 # Stop the thread before doing anything else
575 self._stop_checkpointer.set()
576 self._checkpointer.join()
577 if self._checkpoint_before_quit:
578 # Commit all pending blocks & one last _update()
579 self._local_collection.manifest_text()
580 self._update(final=True)
582 self.save_collection()
584 self._cache_file.close()
586 def save_collection(self):
588 # Check if files should be updated on the remote collection.
589 for fp in self._file_paths:
590 remote_file = self._remote_collection.find(fp)
592 # File don't exist on remote collection, copy it.
593 self._remote_collection.copy(fp, fp, self._local_collection)
594 elif remote_file != self._local_collection.find(fp):
595 # A different file exist on remote collection, overwrite it.
596 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
598 # The file already exist on remote collection, skip it.
600 self._remote_collection.save(num_retries=self.num_retries)
602 self._local_collection.save_new(
603 name=self.name, owner_uuid=self.owner_uuid,
604 ensure_unique_name=self.ensure_unique_name,
605 num_retries=self.num_retries)
607 def destroy_cache(self):
610 os.unlink(self._cache_filename)
611 except OSError as error:
612 # That's what we wanted anyway.
613 if error.errno != errno.ENOENT:
615 self._cache_file.close()
617 def _collection_size(self, collection):
619 Recursively get the total size of the collection
622 for item in listvalues(collection):
623 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
624 size += self._collection_size(item)
629 def _update_task(self):
631 Periodically called support task. File uploading is
632 asynchronous so we poll status from the collection.
634 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
637 def _update(self, final=False):
639 Update cached manifest text and report progress.
641 if self._upload_started:
642 with self._collection_lock:
643 self.bytes_written = self._collection_size(self._local_collection)
646 manifest = self._local_collection.manifest_text()
648 # Get the manifest text without comitting pending blocks
649 manifest = self._local_collection.manifest_text(strip=False,
653 with self._state_lock:
654 self._state['manifest'] = manifest
658 except Exception as e:
659 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
661 self.bytes_written = self.bytes_skipped
662 # Call the reporter, if any
663 self.report_progress()
665 def report_progress(self):
666 if self.reporter is not None:
667 self.reporter(self.bytes_written, self.bytes_expected)
669 def _write_stdin(self, filename):
670 output = self._local_collection.open(filename, 'wb')
671 self._write(sys.stdin, output)
674 def _check_file(self, source, filename):
676 Check if this file needs to be uploaded
678 # Ignore symlinks when requested
679 if (not self.follow_links) and os.path.islink(source):
682 should_upload = False
683 new_file_in_cache = False
684 # Record file path for updating the remote collection before exiting
685 self._file_paths.add(filename)
687 with self._state_lock:
688 # If no previous cached data on this file, store it for an eventual
690 if source not in self._state['files']:
691 self._state['files'][source] = {
692 'mtime': os.path.getmtime(source),
693 'size' : os.path.getsize(source)
695 new_file_in_cache = True
696 cached_file_data = self._state['files'][source]
698 # Check if file was already uploaded (at least partially)
699 file_in_local_collection = self._local_collection.find(filename)
701 # If not resuming, upload the full file.
704 # New file detected from last run, upload it.
705 elif new_file_in_cache:
707 # Local file didn't change from last run.
708 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
709 if not file_in_local_collection:
710 # File not uploaded yet, upload it completely
712 elif file_in_local_collection.permission_expired():
713 # Permission token expired, re-upload file. This will change whenever
714 # we have a API for refreshing tokens.
715 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
717 self._local_collection.remove(filename)
718 elif cached_file_data['size'] == file_in_local_collection.size():
719 # File already there, skip it.
720 self.bytes_skipped += cached_file_data['size']
721 elif cached_file_data['size'] > file_in_local_collection.size():
722 # File partially uploaded, resume!
723 resume_offset = file_in_local_collection.size()
724 self.bytes_skipped += resume_offset
727 # Inconsistent cache, re-upload the file
729 self._local_collection.remove(filename)
730 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
731 # Local file differs from cached data, re-upload it.
733 if file_in_local_collection:
734 self._local_collection.remove(filename)
739 self._files_to_upload.append((source, resume_offset, filename))
740 except ArvPutUploadIsPending:
741 # This could happen when running on dry-mode, close cache file to
742 # avoid locking issues.
743 self._cache_file.close()
746 def _upload_files(self):
747 for source, resume_offset, filename in self._files_to_upload:
748 with open(source, 'rb') as source_fd:
749 with self._state_lock:
750 self._state['files'][source]['mtime'] = os.path.getmtime(source)
751 self._state['files'][source]['size'] = os.path.getsize(source)
752 if resume_offset > 0:
753 # Start upload where we left off
754 output = self._local_collection.open(filename, 'ab')
755 source_fd.seek(resume_offset)
758 output = self._local_collection.open(filename, 'wb')
759 self._write(source_fd, output)
760 output.close(flush=False)
762 def _write(self, source_fd, output):
764 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
769 def _my_collection(self):
770 return self._remote_collection if self.update else self._local_collection
772 def _setup_state(self, update_collection):
774 Create a new cache file or load a previously existing one.
776 # Load an already existing collection for update
777 if update_collection and re.match(arvados.util.collection_uuid_pattern,
780 self._remote_collection = arvados.collection.Collection(
781 update_collection, api_client=self._api_client)
782 except arvados.errors.ApiError as error:
783 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
786 elif update_collection:
787 # Collection locator provided, but unknown format
788 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
791 # Set up cache file name from input paths.
793 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
794 realpaths = sorted(os.path.realpath(path) for path in self.paths)
795 md5.update(b'\0'.join([p.encode() for p in realpaths]))
797 md5.update(self.filename.encode())
798 cache_filename = md5.hexdigest()
799 cache_filepath = os.path.join(
800 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
802 if self.resume and os.path.exists(cache_filepath):
803 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
804 self._cache_file = open(cache_filepath, 'a+')
806 # --no-resume means start with a empty cache file.
807 self.logger.info("Creating new cache file at {}".format(cache_filepath))
808 self._cache_file = open(cache_filepath, 'w+')
809 self._cache_filename = self._cache_file.name
810 self._lock_file(self._cache_file)
811 self._cache_file.seek(0)
813 with self._state_lock:
816 self._state = json.load(self._cache_file)
817 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
818 # Cache at least partially incomplete, set up new cache
819 self._state = copy.deepcopy(self.EMPTY_STATE)
821 # Cache file empty, set up new cache
822 self._state = copy.deepcopy(self.EMPTY_STATE)
824 self.logger.info("No cache usage requested for this run.")
825 # No cache file, set empty state
826 self._state = copy.deepcopy(self.EMPTY_STATE)
827 # Load the previous manifest so we can check if files were modified remotely.
828 self._local_collection = arvados.collection.Collection(
829 self._state['manifest'],
830 replication_desired=self.replication_desired,
831 put_threads=self.put_threads,
832 api_client=self._api_client)
834 def collection_file_paths(self, col, path_prefix='.'):
835 """Return a list of file paths by recursively go through the entire collection `col`"""
837 for name, item in listitems(col):
838 if isinstance(item, arvados.arvfile.ArvadosFile):
839 file_paths.append(os.path.join(path_prefix, name))
840 elif isinstance(item, arvados.collection.Subcollection):
841 new_prefix = os.path.join(path_prefix, name)
842 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
845 def _lock_file(self, fileobj):
847 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
849 raise ResumeCacheConflict("{} locked".format(fileobj.name))
851 def _save_state(self):
853 Atomically save current state into cache.
855 with self._state_lock:
856 # We're not using copy.deepcopy() here because it's a lot slower
857 # than json.dumps(), and we're already needing JSON format to be
859 state = json.dumps(self._state)
861 new_cache = tempfile.NamedTemporaryFile(
863 dir=os.path.dirname(self._cache_filename), delete=False)
864 self._lock_file(new_cache)
865 new_cache.write(state)
868 os.rename(new_cache.name, self._cache_filename)
869 except (IOError, OSError, ResumeCacheConflict) as error:
870 self.logger.error("There was a problem while saving the cache file: {}".format(error))
872 os.unlink(new_cache_name)
873 except NameError: # mkstemp failed.
876 self._cache_file.close()
877 self._cache_file = new_cache
879 def collection_name(self):
880 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
882 def manifest_locator(self):
883 return self._my_collection().manifest_locator()
885 def portable_data_hash(self):
886 pdh = self._my_collection().portable_data_hash()
887 m = self._my_collection().stripped_manifest().encode()
888 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
890 logger.warning("\n".join([
891 "arv-put: API server provided PDH differs from local manifest.",
892 " This should not happen; showing API server version."]))
895 def manifest_text(self, stream_name=".", strip=False, normalize=False):
896 return self._my_collection().manifest_text(stream_name, strip, normalize)
898 def _datablocks_on_item(self, item):
900 Return a list of datablock locators, recursively navigating
901 through subcollections
903 if isinstance(item, arvados.arvfile.ArvadosFile):
906 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
909 for segment in item.segments():
910 loc = segment.locator
913 elif isinstance(item, arvados.collection.Collection):
914 l = [self._datablocks_on_item(x) for x in listvalues(item)]
915 # Fast list flattener method taken from:
916 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
917 return [loc for sublist in l for loc in sublist]
921 def data_locators(self):
922 with self._collection_lock:
923 # Make sure all datablocks are flushed before getting the locators
924 self._my_collection().manifest_text()
925 datablocks = self._datablocks_on_item(self._my_collection())
928 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
931 # Simulate glob.glob() matching behavior without the need to scan the filesystem
932 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
933 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
934 # so instead we're using it on every path component.
935 def pathname_match(pathname, pattern):
936 name = pathname.split(os.sep)
937 # Fix patterns like 'some/subdir/' or 'some//subdir'
938 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
939 if len(name) != len(pat):
941 for i in range(len(name)):
942 if not fnmatch.fnmatch(name[i], pat[i]):
946 def machine_progress(bytes_written, bytes_expected):
947 return _machine_format.format(
948 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
950 def human_progress(bytes_written, bytes_expected):
952 return "\r{}M / {}M {:.1%} ".format(
953 bytes_written >> 20, bytes_expected >> 20,
954 float(bytes_written) / bytes_expected)
956 return "\r{} ".format(bytes_written)
958 def progress_writer(progress_func, outfile=sys.stderr):
959 def write_progress(bytes_written, bytes_expected):
960 outfile.write(progress_func(bytes_written, bytes_expected))
961 return write_progress
963 def exit_signal_handler(sigcode, frame):
966 def desired_project_uuid(api_client, project_uuid, num_retries):
968 query = api_client.users().current()
969 elif arvados.util.user_uuid_pattern.match(project_uuid):
970 query = api_client.users().get(uuid=project_uuid)
971 elif arvados.util.group_uuid_pattern.match(project_uuid):
972 query = api_client.groups().get(uuid=project_uuid)
974 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
975 return query.execute(num_retries=num_retries)['uuid']
977 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
980 args = parse_arguments(arguments)
981 logger = logging.getLogger('arvados.arv_put')
983 logger.setLevel(logging.WARNING)
985 logger.setLevel(logging.INFO)
988 request_id = arvados.util.new_request_id()
989 logger.info('X-Request-Id: '+request_id)
991 if api_client is None:
992 api_client = arvados.api('v1', request_id=request_id)
994 # Determine the name to use
996 if args.stream or args.raw:
997 logger.error("Cannot use --name with --stream or --raw")
999 elif args.update_collection:
1000 logger.error("Cannot use --name with --update-collection")
1002 collection_name = args.name
1004 collection_name = "Saved at {} by {}@{}".format(
1005 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1006 pwd.getpwuid(os.getuid()).pw_name,
1007 socket.gethostname())
1009 if args.project_uuid and (args.stream or args.raw):
1010 logger.error("Cannot use --project-uuid with --stream or --raw")
1013 # Determine the parent project
1015 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1017 except (apiclient_errors.Error, ValueError) as error:
1022 reporter = progress_writer(human_progress)
1023 elif args.batch_progress:
1024 reporter = progress_writer(machine_progress)
1028 # Setup exclude regex from all the --exclude arguments provided
1031 exclude_names = None
1032 if len(args.exclude) > 0:
1033 # We're supporting 2 kinds of exclusion patterns:
1034 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1035 # the name, wherever the file is on the tree)
1036 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1037 # entire path, and should be relative to
1038 # any input dir argument)
1039 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1040 # placed directly underneath the input dir)
1041 for p in args.exclude:
1042 # Only relative paths patterns allowed
1043 if p.startswith(os.sep):
1044 logger.error("Cannot use absolute paths with --exclude")
1046 if os.path.dirname(p):
1047 # We don't support of path patterns with '..'
1048 p_parts = p.split(os.sep)
1051 "Cannot use path patterns that include or '..'")
1053 # Path search pattern
1054 exclude_paths.append(p)
1056 # Name-only search pattern
1057 name_patterns.append(p)
1058 # For name only matching, we can combine all patterns into a single
1059 # regexp, for better performance.
1060 exclude_names = re.compile('|'.join(
1061 [fnmatch.translate(p) for p in name_patterns]
1062 )) if len(name_patterns) > 0 else None
1063 # Show the user the patterns to be used, just in case they weren't
1064 # specified inside quotes and got changed by the shell expansion.
1065 logger.info("Exclude patterns: {}".format(args.exclude))
1067 # If this is used by a human, and there's at least one directory to be
1068 # uploaded, the expected bytes calculation can take a moment.
1069 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1070 logger.info("Calculating upload size, this could take some time...")
1072 writer = ArvPutUploadJob(paths = args.paths,
1073 resume = args.resume,
1074 use_cache = args.use_cache,
1075 filename = args.filename,
1076 reporter = reporter,
1077 api_client = api_client,
1078 num_retries = args.retries,
1079 replication_desired = args.replication,
1080 put_threads = args.threads,
1081 name = collection_name,
1082 owner_uuid = project_uuid,
1083 ensure_unique_name = True,
1084 update_collection = args.update_collection,
1086 dry_run=args.dry_run,
1087 follow_links=args.follow_links,
1088 exclude_paths=exclude_paths,
1089 exclude_names=exclude_names)
1090 except ResumeCacheConflict:
1091 logger.error("\n".join([
1092 "arv-put: Another process is already uploading this data.",
1093 " Use --no-cache if this is really what you want."]))
1095 except CollectionUpdateError as error:
1096 logger.error("\n".join([
1097 "arv-put: %s" % str(error)]))
1099 except ArvPutUploadIsPending:
1100 # Dry run check successful, return proper exit code.
1102 except ArvPutUploadNotPending:
1103 # No files pending for upload
1105 except PathDoesNotExistError as error:
1106 logger.error("\n".join([
1107 "arv-put: %s" % str(error)]))
1110 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1112 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1113 for sigcode in CAUGHT_SIGNALS}
1115 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1116 logger.warning("\n".join([
1117 "arv-put: Resuming previous upload from last checkpoint.",
1118 " Use the --no-resume option to start over."]))
1120 if not args.dry_run:
1121 writer.report_progress()
1124 writer.start(save_collection=not(args.stream or args.raw))
1125 except arvados.errors.ApiError as error:
1126 logger.error("\n".join([
1127 "arv-put: %s" % str(error)]))
1130 if args.progress: # Print newline to split stderr from stdout for humans.
1135 output = writer.manifest_text(normalize=True)
1137 output = writer.manifest_text()
1139 output = ','.join(writer.data_locators())
1142 if args.update_collection:
1143 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1145 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1146 if args.portable_data_hash:
1147 output = writer.portable_data_hash()
1149 output = writer.manifest_locator()
1150 except apiclient_errors.Error as error:
1152 "arv-put: Error creating Collection on project: {}.".format(
1156 # Print the locator (uuid) of the new collection.
1158 status = status or 1
1159 elif not args.silent:
1160 stdout.write(output)
1161 if not output.endswith('\n'):
1164 for sigcode, orig_handler in listitems(orig_signal_handlers):
1165 signal.signal(sigcode, orig_handler)
1174 if __name__ == '__main__':