1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
11 import arvados.collection
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
35 import arvados.commands._util as arv_cmd
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
40 upload_opts = argparse.ArgumentParser(add_help=False)
42 upload_opts.add_argument('--version', action='version',
43 version="%s %s" % (sys.argv[0], __version__),
44 help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55 default=-1, help=argparse.SUPPRESS)
57 _group.add_argument('--normalize', action='store_true',
59 Normalize the manifest by re-ordering files and streams after writing
63 _group.add_argument('--dry-run', action='store_true', default=False,
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
69 _group = upload_opts.add_mutually_exclusive_group()
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
76 _group.add_argument('--stream', action='store_true',
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85 Synonym for --manifest.
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90 Synonym for --manifest.
93 _group.add_argument('--manifest', action='store_true',
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
105 _group.add_argument('--raw', action='store_true',
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113 dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119 dest='filename', help="""
120 Synonym for --filename.
123 upload_opts.add_argument('--filename', type=str, default=None,
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
149 On high latency installations, using a greater number will improve
153 run_opts = argparse.ArgumentParser(add_help=False)
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165 action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
184 _group.add_argument('--no-progress', action='store_true',
186 Do not display human-readable progress on stderr, even if stderr is a
190 _group.add_argument('--batch-progress', action='store_true',
192 Display machine-readable progress on stderr (bytes and, if known,
196 run_opts.add_argument('--silent', action='store_true',
198 Do not print any debug messages to console. (Any error messages will
202 _group = run_opts.add_mutually_exclusive_group()
203 _group.add_argument('--resume', action='store_true', default=True,
205 Continue interrupted uploads from cached state (default).
207 _group.add_argument('--no-resume', action='store_false', dest='resume',
209 Do not continue interrupted uploads from cached state.
212 _group = run_opts.add_mutually_exclusive_group()
213 _group.add_argument('--follow-links', action='store_true', default=True,
214 dest='follow_links', help="""
215 Follow file and directory symlinks (default).
217 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
219 Do not follow file and directory symlinks.
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
225 Save upload state in a cache file for resuming (default).
227 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
229 Do not save upload state in a cache file for resuming.
232 arg_parser = argparse.ArgumentParser(
233 description='Copy data from the local filesystem to Keep.',
234 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
236 def parse_arguments(arguments):
237 args = arg_parser.parse_args(arguments)
239 if len(args.paths) == 0:
242 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
244 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
247 --filename argument cannot be used when storing a directory or
251 # Turn on --progress by default if stderr is a tty.
252 if (not (args.batch_progress or args.no_progress or args.silent)
253 and os.isatty(sys.stderr.fileno())):
256 # Turn off --resume (default) if --no-cache is used.
257 if not args.use_cache:
260 if args.paths == ['-']:
261 if args.update_collection:
263 --update-collection cannot be used when reading from stdin.
266 args.use_cache = False
267 if not args.filename:
268 args.filename = 'stdin'
270 # Remove possible duplicated patterns
271 if len(args.exclude) > 0:
272 args.exclude = list(set(args.exclude))
277 class PathDoesNotExistError(Exception):
281 class CollectionUpdateError(Exception):
285 class ResumeCacheConflict(Exception):
289 class ArvPutArgumentConflict(Exception):
293 class ArvPutUploadIsPending(Exception):
297 class ArvPutUploadNotPending(Exception):
301 class FileUploadList(list):
302 def __init__(self, dry_run=False):
304 self.dry_run = dry_run
306 def append(self, other):
308 raise ArvPutUploadIsPending()
309 super(FileUploadList, self).append(other)
312 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
313 class ArvPutLogFormatter(logging.Formatter):
314 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
316 request_id_informed = False
318 def __init__(self, request_id):
319 self.err_fmtr = logging.Formatter(
320 arvados.log_format+' (X-Request-Id: {})'.format(request_id),
321 arvados.log_date_format)
323 def format(self, record):
324 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
325 self.request_id_informed = True
326 return self.err_fmtr.format(record)
327 return self.std_fmtr.format(record)
330 class ResumeCache(object):
331 CACHE_DIR = '.cache/arvados/arv-put'
333 def __init__(self, file_spec):
334 self.cache_file = open(file_spec, 'a+')
335 self._lock_file(self.cache_file)
336 self.filename = self.cache_file.name
339 def make_path(cls, args):
341 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
342 realpaths = sorted(os.path.realpath(path) for path in args.paths)
343 md5.update(b'\0'.join([p.encode() for p in realpaths]))
344 if any(os.path.isdir(path) for path in realpaths):
347 md5.update(args.filename.encode())
349 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
352 def _lock_file(self, fileobj):
354 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
356 raise ResumeCacheConflict("{} locked".format(fileobj.name))
359 self.cache_file.seek(0)
360 return json.load(self.cache_file)
362 def check_cache(self, api_client=None, num_retries=0):
367 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
368 locator = state["_finished_streams"][0][1][0]
369 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
370 locator = state["_current_stream_locators"][0]
371 if locator is not None:
372 kc = arvados.keep.KeepClient(api_client=api_client)
373 kc.head(locator, num_retries=num_retries)
374 except Exception as e:
379 def save(self, data):
381 new_cache_fd, new_cache_name = tempfile.mkstemp(
382 dir=os.path.dirname(self.filename))
383 self._lock_file(new_cache_fd)
384 new_cache = os.fdopen(new_cache_fd, 'r+')
385 json.dump(data, new_cache)
386 os.rename(new_cache_name, self.filename)
387 except (IOError, OSError, ResumeCacheConflict) as error:
389 os.unlink(new_cache_name)
390 except NameError: # mkstemp failed.
393 self.cache_file.close()
394 self.cache_file = new_cache
397 self.cache_file.close()
401 os.unlink(self.filename)
402 except OSError as error:
403 if error.errno != errno.ENOENT: # That's what we wanted anyway.
409 self.__init__(self.filename)
412 class ArvPutUploadJob(object):
413 CACHE_DIR = '.cache/arvados/arv-put'
415 'manifest' : None, # Last saved manifest checkpoint
416 'files' : {} # Previous run file list: {path : {size, mtime}}
419 def __init__(self, paths, resume=True, use_cache=True, reporter=None,
420 name=None, owner_uuid=None, api_client=None,
421 ensure_unique_name=False, num_retries=None,
422 put_threads=None, replication_desired=None,
423 filename=None, update_time=60.0, update_collection=None,
424 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
425 follow_links=True, exclude_paths=[], exclude_names=None):
428 self.use_cache = use_cache
430 self.reporter = reporter
431 # This will set to 0 before start counting, if no special files are going
433 self.bytes_expected = None
434 self.bytes_written = 0
435 self.bytes_skipped = 0
437 self.owner_uuid = owner_uuid
438 self.ensure_unique_name = ensure_unique_name
439 self.num_retries = num_retries
440 self.replication_desired = replication_desired
441 self.put_threads = put_threads
442 self.filename = filename
443 self._api_client = api_client
444 self._state_lock = threading.Lock()
445 self._state = None # Previous run state (file list & manifest)
446 self._current_files = [] # Current run file list
447 self._cache_file = None
448 self._collection_lock = threading.Lock()
449 self._remote_collection = None # Collection being updated (if asked)
450 self._local_collection = None # Collection from previous run manifest
451 self._file_paths = set() # Files to be updated in remote collection
452 self._stop_checkpointer = threading.Event()
453 self._checkpointer = threading.Thread(target=self._update_task)
454 self._checkpointer.daemon = True
455 self._update_task_time = update_time # How many seconds wait between update runs
456 self._files_to_upload = FileUploadList(dry_run=dry_run)
457 self._upload_started = False
459 self.dry_run = dry_run
460 self._checkpoint_before_quit = True
461 self.follow_links = follow_links
462 self.exclude_paths = exclude_paths
463 self.exclude_names = exclude_names
465 if not self.use_cache and self.resume:
466 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
468 # Check for obvious dry-run responses
469 if self.dry_run and (not self.use_cache or not self.resume):
470 raise ArvPutUploadIsPending()
472 # Load cached data if any and if needed
473 self._setup_state(update_collection)
475 # Build the upload file list, excluding requested files and counting the
476 # bytes expected to be uploaded.
477 self._build_upload_list()
479 def _build_upload_list(self):
481 Scan the requested paths to count file sizes, excluding files & dirs if requested
482 and building the upload file list.
484 # If there aren't special files to be read, reset total bytes count to zero
486 if not any([p for p in self.paths
487 if not (os.path.isfile(p) or os.path.isdir(p))]):
488 self.bytes_expected = 0
490 for path in self.paths:
491 # Test for stdin first, in case some file named '-' exist
494 raise ArvPutUploadIsPending()
495 self._write_stdin(self.filename or 'stdin')
496 elif not os.path.exists(path):
497 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
498 elif os.path.isdir(path):
499 # Use absolute paths on cache index so CWD doesn't interfere
500 # with the caching logic.
502 path = os.path.abspath(path)
503 if orig_path[-1:] == os.sep:
504 # When passing a directory reference with a trailing slash,
505 # its contents should be uploaded directly to the
509 # When passing a directory reference with no trailing slash,
510 # upload the directory to the collection's root.
511 prefixdir = os.path.dirname(path)
513 for root, dirs, files in os.walk(path,
514 followlinks=self.follow_links):
515 root_relpath = os.path.relpath(root, path)
516 if root_relpath == '.':
518 # Exclude files/dirs by full path matching pattern
519 if self.exclude_paths:
520 dirs[:] = [d for d in dirs
521 if not any(pathname_match(
522 os.path.join(root_relpath, d), pat)
523 for pat in self.exclude_paths)]
524 files = [f for f in files
525 if not any(pathname_match(
526 os.path.join(root_relpath, f), pat)
527 for pat in self.exclude_paths)]
528 # Exclude files/dirs by name matching pattern
529 if self.exclude_names is not None:
530 dirs[:] = [d for d in dirs
531 if not self.exclude_names.match(d)]
532 files = [f for f in files
533 if not self.exclude_names.match(f)]
534 # Make os.walk()'s dir traversing order deterministic
538 filepath = os.path.join(root, f)
539 # Add its size to the total bytes count (if applicable)
540 if self.follow_links or (not os.path.islink(filepath)):
541 if self.bytes_expected is not None:
542 self.bytes_expected += os.path.getsize(filepath)
543 self._check_file(filepath,
544 os.path.join(root[len(prefixdir):], f))
546 filepath = os.path.abspath(path)
547 # Add its size to the total bytes count (if applicable)
548 if self.follow_links or (not os.path.islink(filepath)):
549 if self.bytes_expected is not None:
550 self.bytes_expected += os.path.getsize(filepath)
551 self._check_file(filepath,
552 self.filename or os.path.basename(path))
553 # If dry-mode is on, and got up to this point, then we should notify that
554 # there aren't any file to upload.
556 raise ArvPutUploadNotPending()
557 # Remove local_collection's files that don't exist locally anymore, so the
558 # bytes_written count is correct.
559 for f in self.collection_file_paths(self._local_collection,
561 if f != 'stdin' and f != self.filename and not f in self._file_paths:
562 self._local_collection.remove(f)
564 def start(self, save_collection):
566 Start supporting thread & file uploading
568 self._checkpointer.start()
570 # Update bytes_written from current local collection and
571 # report initial progress.
574 self._upload_started = True # Used by the update thread to start checkpointing
576 except (SystemExit, Exception) as e:
577 self._checkpoint_before_quit = False
578 # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
579 # Note: We're expecting SystemExit instead of
580 # KeyboardInterrupt because we have a custom signal
581 # handler in place that raises SystemExit with the catched
583 if isinstance(e, PathDoesNotExistError):
584 # We aren't interested in the traceback for this case
586 elif not isinstance(e, SystemExit) or e.code != -2:
587 self.logger.warning("Abnormal termination:\n{}".format(
588 traceback.format_exc()))
592 # Stop the thread before doing anything else
593 self._stop_checkpointer.set()
594 self._checkpointer.join()
595 if self._checkpoint_before_quit:
596 # Commit all pending blocks & one last _update()
597 self._local_collection.manifest_text()
598 self._update(final=True)
600 self.save_collection()
602 self._cache_file.close()
604 def save_collection(self):
606 # Check if files should be updated on the remote collection.
607 for fp in self._file_paths:
608 remote_file = self._remote_collection.find(fp)
610 # File don't exist on remote collection, copy it.
611 self._remote_collection.copy(fp, fp, self._local_collection)
612 elif remote_file != self._local_collection.find(fp):
613 # A different file exist on remote collection, overwrite it.
614 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
616 # The file already exist on remote collection, skip it.
618 self._remote_collection.save(num_retries=self.num_retries)
620 self._local_collection.save_new(
621 name=self.name, owner_uuid=self.owner_uuid,
622 ensure_unique_name=self.ensure_unique_name,
623 num_retries=self.num_retries)
625 def destroy_cache(self):
628 os.unlink(self._cache_filename)
629 except OSError as error:
630 # That's what we wanted anyway.
631 if error.errno != errno.ENOENT:
633 self._cache_file.close()
635 def _collection_size(self, collection):
637 Recursively get the total size of the collection
640 for item in listvalues(collection):
641 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
642 size += self._collection_size(item)
647 def _update_task(self):
649 Periodically called support task. File uploading is
650 asynchronous so we poll status from the collection.
652 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
655 def _update(self, final=False):
657 Update cached manifest text and report progress.
659 if self._upload_started:
660 with self._collection_lock:
661 self.bytes_written = self._collection_size(self._local_collection)
664 manifest = self._local_collection.manifest_text()
666 # Get the manifest text without comitting pending blocks
667 manifest = self._local_collection.manifest_text(strip=False,
671 with self._state_lock:
672 self._state['manifest'] = manifest
676 except Exception as e:
677 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
679 self.bytes_written = self.bytes_skipped
680 # Call the reporter, if any
681 self.report_progress()
683 def report_progress(self):
684 if self.reporter is not None:
685 self.reporter(self.bytes_written, self.bytes_expected)
687 def _write_stdin(self, filename):
688 output = self._local_collection.open(filename, 'wb')
689 self._write(sys.stdin, output)
692 def _check_file(self, source, filename):
694 Check if this file needs to be uploaded
696 # Ignore symlinks when requested
697 if (not self.follow_links) and os.path.islink(source):
700 should_upload = False
701 new_file_in_cache = False
702 # Record file path for updating the remote collection before exiting
703 self._file_paths.add(filename)
705 with self._state_lock:
706 # If no previous cached data on this file, store it for an eventual
708 if source not in self._state['files']:
709 self._state['files'][source] = {
710 'mtime': os.path.getmtime(source),
711 'size' : os.path.getsize(source)
713 new_file_in_cache = True
714 cached_file_data = self._state['files'][source]
716 # Check if file was already uploaded (at least partially)
717 file_in_local_collection = self._local_collection.find(filename)
719 # If not resuming, upload the full file.
722 # New file detected from last run, upload it.
723 elif new_file_in_cache:
725 # Local file didn't change from last run.
726 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
727 if not file_in_local_collection:
728 # File not uploaded yet, upload it completely
730 elif file_in_local_collection.permission_expired():
731 # Permission token expired, re-upload file. This will change whenever
732 # we have a API for refreshing tokens.
733 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
735 self._local_collection.remove(filename)
736 elif cached_file_data['size'] == file_in_local_collection.size():
737 # File already there, skip it.
738 self.bytes_skipped += cached_file_data['size']
739 elif cached_file_data['size'] > file_in_local_collection.size():
740 # File partially uploaded, resume!
741 resume_offset = file_in_local_collection.size()
742 self.bytes_skipped += resume_offset
745 # Inconsistent cache, re-upload the file
747 self._local_collection.remove(filename)
748 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
749 # Local file differs from cached data, re-upload it.
751 if file_in_local_collection:
752 self._local_collection.remove(filename)
757 self._files_to_upload.append((source, resume_offset, filename))
758 except ArvPutUploadIsPending:
759 # This could happen when running on dry-mode, close cache file to
760 # avoid locking issues.
761 self._cache_file.close()
764 def _upload_files(self):
765 for source, resume_offset, filename in self._files_to_upload:
766 with open(source, 'rb') as source_fd:
767 with self._state_lock:
768 self._state['files'][source]['mtime'] = os.path.getmtime(source)
769 self._state['files'][source]['size'] = os.path.getsize(source)
770 if resume_offset > 0:
771 # Start upload where we left off
772 output = self._local_collection.open(filename, 'ab')
773 source_fd.seek(resume_offset)
776 output = self._local_collection.open(filename, 'wb')
777 self._write(source_fd, output)
778 output.close(flush=False)
780 def _write(self, source_fd, output):
782 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
787 def _my_collection(self):
788 return self._remote_collection if self.update else self._local_collection
790 def _setup_state(self, update_collection):
792 Create a new cache file or load a previously existing one.
794 # Load an already existing collection for update
795 if update_collection and re.match(arvados.util.collection_uuid_pattern,
798 self._remote_collection = arvados.collection.Collection(
799 update_collection, api_client=self._api_client)
800 except arvados.errors.ApiError as error:
801 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
804 elif update_collection:
805 # Collection locator provided, but unknown format
806 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
809 # Set up cache file name from input paths.
811 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
812 realpaths = sorted(os.path.realpath(path) for path in self.paths)
813 md5.update(b'\0'.join([p.encode() for p in realpaths]))
815 md5.update(self.filename.encode())
816 cache_filename = md5.hexdigest()
817 cache_filepath = os.path.join(
818 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
820 if self.resume and os.path.exists(cache_filepath):
821 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
822 self._cache_file = open(cache_filepath, 'a+')
824 # --no-resume means start with a empty cache file.
825 self.logger.info("Creating new cache file at {}".format(cache_filepath))
826 self._cache_file = open(cache_filepath, 'w+')
827 self._cache_filename = self._cache_file.name
828 self._lock_file(self._cache_file)
829 self._cache_file.seek(0)
831 with self._state_lock:
834 self._state = json.load(self._cache_file)
835 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
836 # Cache at least partially incomplete, set up new cache
837 self._state = copy.deepcopy(self.EMPTY_STATE)
839 # Cache file empty, set up new cache
840 self._state = copy.deepcopy(self.EMPTY_STATE)
842 self.logger.info("No cache usage requested for this run.")
843 # No cache file, set empty state
844 self._state = copy.deepcopy(self.EMPTY_STATE)
845 # Load the previous manifest so we can check if files were modified remotely.
846 self._local_collection = arvados.collection.Collection(
847 self._state['manifest'],
848 replication_desired=self.replication_desired,
849 put_threads=self.put_threads,
850 api_client=self._api_client)
852 def collection_file_paths(self, col, path_prefix='.'):
853 """Return a list of file paths by recursively go through the entire collection `col`"""
855 for name, item in listitems(col):
856 if isinstance(item, arvados.arvfile.ArvadosFile):
857 file_paths.append(os.path.join(path_prefix, name))
858 elif isinstance(item, arvados.collection.Subcollection):
859 new_prefix = os.path.join(path_prefix, name)
860 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
863 def _lock_file(self, fileobj):
865 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
867 raise ResumeCacheConflict("{} locked".format(fileobj.name))
869 def _save_state(self):
871 Atomically save current state into cache.
873 with self._state_lock:
874 # We're not using copy.deepcopy() here because it's a lot slower
875 # than json.dumps(), and we're already needing JSON format to be
877 state = json.dumps(self._state)
879 new_cache = tempfile.NamedTemporaryFile(
881 dir=os.path.dirname(self._cache_filename), delete=False)
882 self._lock_file(new_cache)
883 new_cache.write(state)
886 os.rename(new_cache.name, self._cache_filename)
887 except (IOError, OSError, ResumeCacheConflict) as error:
888 self.logger.error("There was a problem while saving the cache file: {}".format(error))
890 os.unlink(new_cache_name)
891 except NameError: # mkstemp failed.
894 self._cache_file.close()
895 self._cache_file = new_cache
897 def collection_name(self):
898 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
900 def manifest_locator(self):
901 return self._my_collection().manifest_locator()
903 def portable_data_hash(self):
904 pdh = self._my_collection().portable_data_hash()
905 m = self._my_collection().stripped_manifest().encode()
906 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
908 self.logger.warning("\n".join([
909 "arv-put: API server provided PDH differs from local manifest.",
910 " This should not happen; showing API server version."]))
913 def manifest_text(self, stream_name=".", strip=False, normalize=False):
914 return self._my_collection().manifest_text(stream_name, strip, normalize)
916 def _datablocks_on_item(self, item):
918 Return a list of datablock locators, recursively navigating
919 through subcollections
921 if isinstance(item, arvados.arvfile.ArvadosFile):
924 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
927 for segment in item.segments():
928 loc = segment.locator
931 elif isinstance(item, arvados.collection.Collection):
932 l = [self._datablocks_on_item(x) for x in listvalues(item)]
933 # Fast list flattener method taken from:
934 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
935 return [loc for sublist in l for loc in sublist]
939 def data_locators(self):
940 with self._collection_lock:
941 # Make sure all datablocks are flushed before getting the locators
942 self._my_collection().manifest_text()
943 datablocks = self._datablocks_on_item(self._my_collection())
946 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
949 # Simulate glob.glob() matching behavior without the need to scan the filesystem
950 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
951 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
952 # so instead we're using it on every path component.
953 def pathname_match(pathname, pattern):
954 name = pathname.split(os.sep)
955 # Fix patterns like 'some/subdir/' or 'some//subdir'
956 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
957 if len(name) != len(pat):
959 for i in range(len(name)):
960 if not fnmatch.fnmatch(name[i], pat[i]):
964 def machine_progress(bytes_written, bytes_expected):
965 return _machine_format.format(
966 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
968 def human_progress(bytes_written, bytes_expected):
970 return "\r{}M / {}M {:.1%} ".format(
971 bytes_written >> 20, bytes_expected >> 20,
972 float(bytes_written) / bytes_expected)
974 return "\r{} ".format(bytes_written)
976 def progress_writer(progress_func, outfile=sys.stderr):
977 def write_progress(bytes_written, bytes_expected):
978 outfile.write(progress_func(bytes_written, bytes_expected))
979 return write_progress
981 def exit_signal_handler(sigcode, frame):
982 logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
985 def desired_project_uuid(api_client, project_uuid, num_retries):
987 query = api_client.users().current()
988 elif arvados.util.user_uuid_pattern.match(project_uuid):
989 query = api_client.users().get(uuid=project_uuid)
990 elif arvados.util.group_uuid_pattern.match(project_uuid):
991 query = api_client.groups().get(uuid=project_uuid)
993 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
994 return query.execute(num_retries=num_retries)['uuid']
996 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
999 args = parse_arguments(arguments)
1000 logger = logging.getLogger('arvados.arv_put')
1002 logger.setLevel(logging.WARNING)
1004 logger.setLevel(logging.INFO)
1007 request_id = arvados.util.new_request_id()
1009 formatter = ArvPutLogFormatter(request_id)
1010 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1012 if api_client is None:
1013 api_client = arvados.api('v1', request_id=request_id)
1015 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1017 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1018 for sigcode in CAUGHT_SIGNALS}
1020 # Determine the name to use
1022 if args.stream or args.raw:
1023 logger.error("Cannot use --name with --stream or --raw")
1025 elif args.update_collection:
1026 logger.error("Cannot use --name with --update-collection")
1028 collection_name = args.name
1030 collection_name = "Saved at {} by {}@{}".format(
1031 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1032 pwd.getpwuid(os.getuid()).pw_name,
1033 socket.gethostname())
1035 if args.project_uuid and (args.stream or args.raw):
1036 logger.error("Cannot use --project-uuid with --stream or --raw")
1039 # Determine the parent project
1041 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1043 except (apiclient_errors.Error, ValueError) as error:
1048 reporter = progress_writer(human_progress)
1049 elif args.batch_progress:
1050 reporter = progress_writer(machine_progress)
1054 # Setup exclude regex from all the --exclude arguments provided
1057 exclude_names = None
1058 if len(args.exclude) > 0:
1059 # We're supporting 2 kinds of exclusion patterns:
1060 # 1) --exclude '*.jpg' (file/dir name patterns, will only match
1061 # the name, wherever the file is on the tree)
1062 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the
1063 # entire path, and should be relative to
1064 # any input dir argument)
1065 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs
1066 # placed directly underneath the input dir)
1067 for p in args.exclude:
1068 # Only relative paths patterns allowed
1069 if p.startswith(os.sep):
1070 logger.error("Cannot use absolute paths with --exclude")
1072 if os.path.dirname(p):
1073 # We don't support of path patterns with '..'
1074 p_parts = p.split(os.sep)
1077 "Cannot use path patterns that include or '..'")
1079 # Path search pattern
1080 exclude_paths.append(p)
1082 # Name-only search pattern
1083 name_patterns.append(p)
1084 # For name only matching, we can combine all patterns into a single
1085 # regexp, for better performance.
1086 exclude_names = re.compile('|'.join(
1087 [fnmatch.translate(p) for p in name_patterns]
1088 )) if len(name_patterns) > 0 else None
1089 # Show the user the patterns to be used, just in case they weren't
1090 # specified inside quotes and got changed by the shell expansion.
1091 logger.info("Exclude patterns: {}".format(args.exclude))
1093 # If this is used by a human, and there's at least one directory to be
1094 # uploaded, the expected bytes calculation can take a moment.
1095 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1096 logger.info("Calculating upload size, this could take some time...")
1098 writer = ArvPutUploadJob(paths = args.paths,
1099 resume = args.resume,
1100 use_cache = args.use_cache,
1101 filename = args.filename,
1102 reporter = reporter,
1103 api_client = api_client,
1104 num_retries = args.retries,
1105 replication_desired = args.replication,
1106 put_threads = args.threads,
1107 name = collection_name,
1108 owner_uuid = project_uuid,
1109 ensure_unique_name = True,
1110 update_collection = args.update_collection,
1112 dry_run=args.dry_run,
1113 follow_links=args.follow_links,
1114 exclude_paths=exclude_paths,
1115 exclude_names=exclude_names)
1116 except ResumeCacheConflict:
1117 logger.error("\n".join([
1118 "arv-put: Another process is already uploading this data.",
1119 " Use --no-cache if this is really what you want."]))
1121 except CollectionUpdateError as error:
1122 logger.error("\n".join([
1123 "arv-put: %s" % str(error)]))
1125 except ArvPutUploadIsPending:
1126 # Dry run check successful, return proper exit code.
1128 except ArvPutUploadNotPending:
1129 # No files pending for upload
1131 except PathDoesNotExistError as error:
1132 logger.error("\n".join([
1133 "arv-put: %s" % str(error)]))
1136 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1137 logger.warning("\n".join([
1138 "arv-put: Resuming previous upload from last checkpoint.",
1139 " Use the --no-resume option to start over."]))
1141 if not args.dry_run:
1142 writer.report_progress()
1145 writer.start(save_collection=not(args.stream or args.raw))
1146 except arvados.errors.ApiError as error:
1147 logger.error("\n".join([
1148 "arv-put: %s" % str(error)]))
1151 if args.progress: # Print newline to split stderr from stdout for humans.
1156 output = writer.manifest_text(normalize=True)
1158 output = writer.manifest_text()
1160 output = ','.join(writer.data_locators())
1163 if args.update_collection:
1164 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1166 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1167 if args.portable_data_hash:
1168 output = writer.portable_data_hash()
1170 output = writer.manifest_locator()
1171 except apiclient_errors.Error as error:
1173 "arv-put: Error creating Collection on project: {}.".format(
1177 # Print the locator (uuid) of the new collection.
1179 status = status or 1
1180 elif not args.silent:
1181 stdout.write(output)
1182 if not output.endswith('\n'):
1185 for sigcode, orig_handler in listitems(orig_signal_handlers):
1186 signal.signal(sigcode, orig_handler)
1195 if __name__ == '__main__':