4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
26 from apiclient import errors as apiclient_errors
28 import arvados.commands._util as arv_cmd
30 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 upload_opts = argparse.ArgumentParser(add_help=False)
35 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
37 Local file or directory. Default: read from standard input.
40 _group = upload_opts.add_mutually_exclusive_group()
42 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
44 Maximum depth of directory tree to represent in the manifest
45 structure. A directory structure deeper than this will be represented
46 as a single stream in the manifest. If N=0, the manifest will contain
47 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
48 stream per filesystem directory that contains files.
51 _group.add_argument('--normalize', action='store_true',
53 Normalize the manifest by re-ordering files and streams after writing
57 _group = upload_opts.add_mutually_exclusive_group()
59 _group.add_argument('--as-stream', action='store_true', dest='stream',
64 _group.add_argument('--stream', action='store_true',
66 Store the file content and display the resulting manifest on
67 stdout. Do not write the manifest to Keep or save a Collection object
71 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
73 Synonym for --manifest.
76 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
78 Synonym for --manifest.
81 _group.add_argument('--manifest', action='store_true',
83 Store the file data and resulting manifest in Keep, save a Collection
84 object in Arvados, and display the manifest locator (Collection uuid)
85 on stdout. This is the default behavior.
88 _group.add_argument('--as-raw', action='store_true', dest='raw',
93 _group.add_argument('--raw', action='store_true',
95 Store the file content and display the data block locators on stdout,
96 separated by commas, with a trailing newline. Do not store a
100 upload_opts.add_argument('--update-collection', type=str, default=None,
101 dest='update_collection', metavar="UUID", help="""
102 Update an existing collection identified by the given Arvados collection
103 UUID. All new local files will be uploaded.
106 upload_opts.add_argument('--use-filename', type=str, default=None,
107 dest='filename', help="""
108 Synonym for --filename.
111 upload_opts.add_argument('--filename', type=str, default=None,
113 Use the given filename in the manifest, instead of the name of the
114 local file. This is useful when "-" or "/dev/stdin" is given as an
115 input file. It can be used only if there is exactly one path given and
116 it is not a directory. Implies --manifest.
119 upload_opts.add_argument('--portable-data-hash', action='store_true',
121 Print the portable data hash instead of the Arvados UUID for the collection
122 created by the upload.
125 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
127 Set the replication level for the new collection: how many different
128 physical storage devices (e.g., disks) should have a copy of each data
129 block. Default is to use the server-provided default (if any) or 2.
132 run_opts = argparse.ArgumentParser(add_help=False)
134 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
135 Store the collection in the specified project, instead of your Home
139 run_opts.add_argument('--name', help="""
140 Save the collection with the specified name.
143 _group = run_opts.add_mutually_exclusive_group()
144 _group.add_argument('--progress', action='store_true',
146 Display human-readable progress on stderr (bytes and, if possible,
147 percentage of total data size). This is the default behavior when
151 _group.add_argument('--no-progress', action='store_true',
153 Do not display human-readable progress on stderr, even if stderr is a
157 _group.add_argument('--batch-progress', action='store_true',
159 Display machine-readable progress on stderr (bytes and, if known,
163 _group = run_opts.add_mutually_exclusive_group()
164 _group.add_argument('--resume', action='store_true', default=True,
166 Continue interrupted uploads from cached state (default).
168 _group.add_argument('--no-resume', action='store_false', dest='resume',
170 Do not continue interrupted uploads from cached state.
173 arg_parser = argparse.ArgumentParser(
174 description='Copy data from the local filesystem to Keep.',
175 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
177 def parse_arguments(arguments):
178 args = arg_parser.parse_args(arguments)
180 if len(args.paths) == 0:
183 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
185 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
188 --filename argument cannot be used when storing a directory or
192 # Turn on --progress by default if stderr is a tty.
193 if (not (args.batch_progress or args.no_progress)
194 and os.isatty(sys.stderr.fileno())):
197 if args.paths == ['-']:
198 if args.update_collection:
200 --update-collection cannot be used when reading from stdin.
203 if not args.filename:
204 args.filename = 'stdin'
209 class CollectionUpdateError(Exception):
213 class ResumeCacheConflict(Exception):
217 class ResumeCache(object):
218 CACHE_DIR = '.cache/arvados/arv-put'
220 def __init__(self, file_spec):
221 self.cache_file = open(file_spec, 'a+')
222 self._lock_file(self.cache_file)
223 self.filename = self.cache_file.name
226 def make_path(cls, args):
228 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
229 realpaths = sorted(os.path.realpath(path) for path in args.paths)
230 md5.update('\0'.join(realpaths))
231 if any(os.path.isdir(path) for path in realpaths):
232 md5.update(str(max(args.max_manifest_depth, -1)))
234 md5.update(args.filename)
236 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
239 def _lock_file(self, fileobj):
241 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
243 raise ResumeCacheConflict("{} locked".format(fileobj.name))
246 self.cache_file.seek(0)
247 return json.load(self.cache_file)
249 def check_cache(self, api_client=None, num_retries=0):
254 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
255 locator = state["_finished_streams"][0][1][0]
256 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
257 locator = state["_current_stream_locators"][0]
258 if locator is not None:
259 kc = arvados.keep.KeepClient(api_client=api_client)
260 kc.head(locator, num_retries=num_retries)
261 except Exception as e:
266 def save(self, data):
268 new_cache_fd, new_cache_name = tempfile.mkstemp(
269 dir=os.path.dirname(self.filename))
270 self._lock_file(new_cache_fd)
271 new_cache = os.fdopen(new_cache_fd, 'r+')
272 json.dump(data, new_cache)
273 os.rename(new_cache_name, self.filename)
274 except (IOError, OSError, ResumeCacheConflict) as error:
276 os.unlink(new_cache_name)
277 except NameError: # mkstemp failed.
280 self.cache_file.close()
281 self.cache_file = new_cache
284 self.cache_file.close()
288 os.unlink(self.filename)
289 except OSError as error:
290 if error.errno != errno.ENOENT: # That's what we wanted anyway.
296 self.__init__(self.filename)
299 class ArvPutUploadJob(object):
300 CACHE_DIR = '.cache/arvados/arv-put'
302 'manifest' : None, # Last saved manifest checkpoint
303 'files' : {} # Previous run file list: {path : {size, mtime}}
306 def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
307 name=None, owner_uuid=None, ensure_unique_name=False,
308 num_retries=None, replication_desired=None,
309 filename=None, update_time=1.0, update_collection=None):
313 self.reporter = reporter
314 self.bytes_expected = bytes_expected
315 self.bytes_written = 0
316 self.bytes_skipped = 0
318 self.owner_uuid = owner_uuid
319 self.ensure_unique_name = ensure_unique_name
320 self.num_retries = num_retries
321 self.replication_desired = replication_desired
322 self.filename = filename
323 self._state_lock = threading.Lock()
324 self._state = None # Previous run state (file list & manifest)
325 self._current_files = [] # Current run file list
326 self._cache_file = None
327 self._collection_lock = threading.Lock()
328 self._remote_collection = None # Collection being updated (if asked)
329 self._local_collection = None # Collection from previous run manifest
330 self._file_paths = [] # Files to be updated in remote collection
331 self._stop_checkpointer = threading.Event()
332 self._checkpointer = threading.Thread(target=self._update_task)
333 self._update_task_time = update_time # How many seconds wait between update runs
334 self.logger = logging.getLogger('arvados.arv_put')
336 # Load cached data if any and if needed
337 self._setup_state(update_collection)
339 def start(self, save_collection):
341 Start supporting thread & file uploading
343 self._checkpointer.daemon = True
344 self._checkpointer.start()
346 for path in self.paths:
347 # Test for stdin first, in case some file named '-' exist
349 self._write_stdin(self.filename or 'stdin')
350 elif os.path.isdir(path):
351 if path == '.' or path == './' or os.path.dirname(path) == '':
354 dirname = os.path.dirname(path) + '/'
355 for root, dirs, files in os.walk(path):
356 # Make os.walk()'s dir traversing order deterministic
360 self._write_file(os.path.join(root, f),
361 os.path.join(root[len(dirname):], f))
363 self._write_file(path, self.filename or os.path.basename(path))
365 # Stop the thread before doing anything else
366 self._stop_checkpointer.set()
367 self._checkpointer.join()
368 # Commit all & one last _update()
371 self.save_collection()
373 self._cache_file.close()
374 # Correct the final written bytes count
375 self.bytes_written -= self.bytes_skipped
377 def save_collection(self):
379 # Check if files should be updated on the remote collection.
380 for fp in self._file_paths:
381 remote_file = self._remote_collection.find(fp)
383 # File don't exist on remote collection, copy it.
384 self._remote_collection.copy(fp, fp, self._local_collection)
385 elif remote_file != self._local_collection.find(fp):
386 # A different file exist on remote collection, overwrite it.
387 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
389 # The file already exist on remote collection, skip it.
391 self._remote_collection.save(num_retries=self.num_retries)
393 self._local_collection.save_new(
394 name=self.name, owner_uuid=self.owner_uuid,
395 ensure_unique_name=self.ensure_unique_name,
396 num_retries=self.num_retries)
398 def destroy_cache(self):
401 os.unlink(self._cache_filename)
402 except OSError as error:
403 # That's what we wanted anyway.
404 if error.errno != errno.ENOENT:
406 self._cache_file.close()
408 def _collection_size(self, collection):
410 Recursively get the total size of the collection
413 for item in collection.values():
414 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
415 size += self._collection_size(item)
420 def _update_task(self):
422 Periodically called support task. File uploading is
423 asynchronous so we poll status from the collection.
425 while not self._stop_checkpointer.wait(self._update_task_time):
430 Update cached manifest text and report progress.
432 with self._collection_lock:
433 self.bytes_written = self._collection_size(self._local_collection)
434 # Update cache, if resume enabled
435 with self._state_lock:
436 # Get the manifest text without comitting pending blocks
437 self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
439 # Call the reporter, if any
440 self.report_progress()
442 def report_progress(self):
443 if self.reporter is not None:
444 self.reporter(self.bytes_written, self.bytes_expected)
446 def _write_stdin(self, filename):
447 output = self._local_collection.open(filename, 'w')
448 self._write(sys.stdin, output)
451 def _write_file(self, source, filename):
453 should_upload = False
454 new_file_in_cache = False
456 # Record file path for updating the remote collection before exiting
457 self._file_paths.append(filename)
459 with self._state_lock:
460 # If no previous cached data on this file, store it for an eventual
462 if source not in self._state['files']:
463 self._state['files'][source] = {
464 'mtime': os.path.getmtime(source),
465 'size' : os.path.getsize(source)
467 new_file_in_cache = True
468 cached_file_data = self._state['files'][source]
470 # Check if file was already uploaded (at least partially)
471 file_in_local_collection = self._local_collection.find(filename)
473 # If not resuming, upload the full file.
476 # New file detected from last run, upload it.
477 elif new_file_in_cache:
479 # Local file didn't change from last run.
480 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
481 if not file_in_local_collection:
482 # File not uploaded yet, upload it completely
484 elif cached_file_data['size'] == file_in_local_collection.size():
485 # File already there, skip it.
486 self.bytes_skipped += cached_file_data['size']
487 elif cached_file_data['size'] > file_in_local_collection.size():
488 # File partially uploaded, resume!
489 resume_offset = file_in_local_collection.size()
492 # Inconsistent cache, re-upload the file
494 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
495 # Local file differs from cached data, re-upload it.
500 with open(source, 'r') as source_fd:
501 with self._state_lock:
502 self._state['files'][source]['mtime'] = os.path.getmtime(source)
503 self._state['files'][source]['size'] = os.path.getsize(source)
504 if resume_offset > 0:
505 # Start upload where we left off
506 output = self._local_collection.open(filename, 'a')
507 source_fd.seek(resume_offset)
508 self.bytes_skipped += resume_offset
511 output = self._local_collection.open(filename, 'w')
512 self._write(source_fd, output)
513 output.close(flush=False)
515 def _write(self, source_fd, output):
518 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
519 # Allow an empty file to be written
520 if not data and not first_read:
526 def _my_collection(self):
527 return self._remote_collection if self.update else self._local_collection
529 def _setup_state(self, update_collection):
531 Create a new cache file or load a previously existing one.
533 # Load an already existing collection for update
534 if update_collection and re.match(arvados.util.collection_uuid_pattern,
537 self._remote_collection = arvados.collection.Collection(update_collection)
538 except arvados.errors.ApiError as error:
539 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
542 elif update_collection:
543 # Collection locator provided, but unknown format
544 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
546 # Set up cache file name from input paths.
548 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
549 realpaths = sorted(os.path.realpath(path) for path in self.paths)
550 md5.update('\0'.join(realpaths))
552 md5.update(self.filename)
553 cache_filename = md5.hexdigest()
554 self._cache_file = open(os.path.join(
555 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
556 cache_filename), 'a+')
557 self._cache_filename = self._cache_file.name
558 self._lock_file(self._cache_file)
559 self._cache_file.seek(0)
560 with self._state_lock:
562 self._state = json.load(self._cache_file)
563 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
564 # Cache at least partially incomplete, set up new cache
565 self._state = copy.deepcopy(self.EMPTY_STATE)
567 # Cache file empty, set up new cache
568 self._state = copy.deepcopy(self.EMPTY_STATE)
570 # Load the previous manifest so we can check if files were modified remotely.
571 self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
572 # Load how many bytes were uploaded on previous run
573 with self._collection_lock:
574 self.bytes_written = self._collection_size(self._local_collection)
576 def _lock_file(self, fileobj):
578 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
580 raise ResumeCacheConflict("{} locked".format(fileobj.name))
582 def _save_state(self):
584 Atomically save current state into cache.
587 with self._state_lock:
589 new_cache_fd, new_cache_name = tempfile.mkstemp(
590 dir=os.path.dirname(self._cache_filename))
591 self._lock_file(new_cache_fd)
592 new_cache = os.fdopen(new_cache_fd, 'r+')
593 json.dump(state, new_cache)
596 os.rename(new_cache_name, self._cache_filename)
597 except (IOError, OSError, ResumeCacheConflict) as error:
598 self.logger.error("There was a problem while saving the cache file: {}".format(error))
600 os.unlink(new_cache_name)
601 except NameError: # mkstemp failed.
604 self._cache_file.close()
605 self._cache_file = new_cache
607 def collection_name(self):
608 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
610 def manifest_locator(self):
611 return self._my_collection().manifest_locator()
613 def portable_data_hash(self):
614 return self._my_collection().portable_data_hash()
616 def manifest_text(self, stream_name=".", strip=False, normalize=False):
617 return self._my_collection().manifest_text(stream_name, strip, normalize)
619 def _datablocks_on_item(self, item):
621 Return a list of datablock locators, recursively navigating
622 through subcollections
624 if isinstance(item, arvados.arvfile.ArvadosFile):
627 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
630 for segment in item.segments():
631 loc = segment.locator
634 elif isinstance(item, arvados.collection.Collection):
635 l = [self._datablocks_on_item(x) for x in item.values()]
636 # Fast list flattener method taken from:
637 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
638 return [loc for sublist in l for loc in sublist]
642 def data_locators(self):
643 with self._collection_lock:
644 # Make sure all datablocks are flushed before getting the locators
645 self._my_collection().manifest_text()
646 datablocks = self._datablocks_on_item(self._my_collection())
650 def expected_bytes_for(pathlist):
651 # Walk the given directory trees and stat files, adding up file sizes,
652 # so we can display progress as percent
654 for path in pathlist:
655 if os.path.isdir(path):
656 for filename in arvados.util.listdir_recursive(path):
657 bytesum += os.path.getsize(os.path.join(path, filename))
658 elif not os.path.isfile(path):
661 bytesum += os.path.getsize(path)
664 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
666 def machine_progress(bytes_written, bytes_expected):
667 return _machine_format.format(
668 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
670 def human_progress(bytes_written, bytes_expected):
672 return "\r{}M / {}M {:.1%} ".format(
673 bytes_written >> 20, bytes_expected >> 20,
674 float(bytes_written) / bytes_expected)
676 return "\r{} ".format(bytes_written)
678 def progress_writer(progress_func, outfile=sys.stderr):
679 def write_progress(bytes_written, bytes_expected):
680 outfile.write(progress_func(bytes_written, bytes_expected))
681 return write_progress
683 def exit_signal_handler(sigcode, frame):
686 def desired_project_uuid(api_client, project_uuid, num_retries):
688 query = api_client.users().current()
689 elif arvados.util.user_uuid_pattern.match(project_uuid):
690 query = api_client.users().get(uuid=project_uuid)
691 elif arvados.util.group_uuid_pattern.match(project_uuid):
692 query = api_client.groups().get(uuid=project_uuid)
694 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
695 return query.execute(num_retries=num_retries)['uuid']
697 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
700 args = parse_arguments(arguments)
702 if api_client is None:
703 api_client = arvados.api('v1')
705 # Determine the name to use
707 if args.stream or args.raw:
708 print >>stderr, "Cannot use --name with --stream or --raw"
710 collection_name = args.name
712 collection_name = "Saved at {} by {}@{}".format(
713 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
714 pwd.getpwuid(os.getuid()).pw_name,
715 socket.gethostname())
717 if args.project_uuid and (args.stream or args.raw):
718 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
721 # Determine the parent project
723 project_uuid = desired_project_uuid(api_client, args.project_uuid,
725 except (apiclient_errors.Error, ValueError) as error:
726 print >>stderr, error
730 reporter = progress_writer(human_progress)
731 elif args.batch_progress:
732 reporter = progress_writer(machine_progress)
736 bytes_expected = expected_bytes_for(args.paths)
739 writer = ArvPutUploadJob(paths = args.paths,
740 resume = args.resume,
741 filename = args.filename,
743 bytes_expected = bytes_expected,
744 num_retries = args.retries,
745 replication_desired = args.replication,
746 name = collection_name,
747 owner_uuid = project_uuid,
748 ensure_unique_name = True,
749 update_collection = args.update_collection)
750 except ResumeCacheConflict:
751 print >>stderr, "\n".join([
752 "arv-put: Another process is already uploading this data.",
753 " Use --no-resume if this is really what you want."])
755 except CollectionUpdateError as error:
756 print >>stderr, "\n".join([
757 "arv-put: %s" % str(error)])
760 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
762 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
763 for sigcode in CAUGHT_SIGNALS}
765 if not args.update_collection and args.resume and writer.bytes_written > 0:
766 print >>stderr, "\n".join([
767 "arv-put: Resuming previous upload from last checkpoint.",
768 " Use the --no-resume option to start over."])
770 writer.report_progress()
773 writer.start(save_collection=not(args.stream or args.raw))
774 except arvados.errors.ApiError as error:
775 print >>stderr, "\n".join([
776 "arv-put: %s" % str(error)])
779 if args.progress: # Print newline to split stderr from stdout for humans.
784 output = writer.manifest_text(normalize=True)
786 output = writer.manifest_text()
788 output = ','.join(writer.data_locators())
791 if args.update_collection:
792 print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
794 print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
795 if args.portable_data_hash:
796 output = writer.portable_data_hash()
798 output = writer.manifest_locator()
799 except apiclient_errors.Error as error:
801 "arv-put: Error creating Collection on project: {}.".format(
805 # Print the locator (uuid) of the new collection.
810 if not output.endswith('\n'):
813 for sigcode, orig_handler in orig_signal_handlers.items():
814 signal.signal(sigcode, orig_handler)
823 if __name__ == '__main__':