4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
21 from apiclient import errors as apiclient_errors
23 import arvados.commands._util as arv_cmd
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
28 upload_opts = argparse.ArgumentParser(add_help=False)
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
32 Local file or directory. Default: read from standard input.
35 _group = upload_opts.add_mutually_exclusive_group()
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
46 _group.add_argument('--normalize', action='store_true',
48 Normalize the manifest by re-ordering files and streams after writing
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
59 _group.add_argument('--stream', action='store_true',
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68 Synonym for --manifest.
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73 Synonym for --manifest.
76 _group.add_argument('--manifest', action='store_true',
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
88 _group.add_argument('--raw', action='store_true',
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96 dest='filename', help="""
97 Synonym for --filename.
100 upload_opts.add_argument('--filename', type=str, default=None,
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
121 run_opts = argparse.ArgumentParser(add_help=False)
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
140 _group.add_argument('--no-progress', action='store_true',
142 Do not display human-readable progress on stderr, even if stderr is a
146 _group.add_argument('--batch-progress', action='store_true',
148 Display machine-readable progress on stderr (bytes and, if known,
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
155 Continue interrupted uploads from cached state (default).
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
159 Do not continue interrupted uploads from cached state.
162 arg_parser = argparse.ArgumentParser(
163 description='Copy data from the local filesystem to Keep.',
164 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
166 def parse_arguments(arguments):
167 args = arg_parser.parse_args(arguments)
169 if len(args.paths) == 0:
172 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
174 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
177 --filename argument cannot be used when storing a directory or
181 # Turn on --progress by default if stderr is a tty.
182 if (not (args.batch_progress or args.no_progress)
183 and os.isatty(sys.stderr.fileno())):
186 if args.paths == ['-']:
188 if not args.filename:
189 args.filename = 'stdin'
193 class ResumeCacheConflict(Exception):
197 class ResumeCache(object):
198 CACHE_DIR = '.cache/arvados/arv-put'
200 def __init__(self, file_spec):
201 self.cache_file = open(file_spec, 'a+')
202 self._lock_file(self.cache_file)
203 self.filename = self.cache_file.name
206 def make_path(cls, args):
208 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209 realpaths = sorted(os.path.realpath(path) for path in args.paths)
210 md5.update('\0'.join(realpaths))
211 if any(os.path.isdir(path) for path in realpaths):
212 md5.update(str(max(args.max_manifest_depth, -1)))
214 md5.update(args.filename)
216 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
219 def _lock_file(self, fileobj):
221 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
223 raise ResumeCacheConflict("{} locked".format(fileobj.name))
226 self.cache_file.seek(0)
227 return json.load(self.cache_file)
229 def save(self, data):
231 new_cache_fd, new_cache_name = tempfile.mkstemp(
232 dir=os.path.dirname(self.filename))
233 self._lock_file(new_cache_fd)
234 new_cache = os.fdopen(new_cache_fd, 'r+')
235 json.dump(data, new_cache)
236 os.rename(new_cache_name, self.filename)
237 except (IOError, OSError, ResumeCacheConflict) as error:
239 os.unlink(new_cache_name)
240 except NameError: # mkstemp failed.
243 self.cache_file.close()
244 self.cache_file = new_cache
247 self.cache_file.close()
251 os.unlink(self.filename)
252 except OSError as error:
253 if error.errno != errno.ENOENT: # That's what we wanted anyway.
259 self.__init__(self.filename)
262 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
263 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
264 ['bytes_written', '_seen_inputs'])
266 def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
267 self.bytes_written = 0
268 self._seen_inputs = []
270 self.reporter = reporter
271 self.bytes_expected = bytes_expected
272 super(ArvPutCollectionWriter, self).__init__(**kwargs)
275 def from_cache(cls, cache, reporter=None, bytes_expected=None,
276 num_retries=0, replication=0):
279 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
280 writer = cls.from_state(state, cache, reporter, bytes_expected,
281 num_retries=num_retries,
282 replication=replication)
283 except (TypeError, ValueError,
284 arvados.errors.StaleWriterStateError) as error:
285 return cls(cache, reporter, bytes_expected,
286 num_retries=num_retries,
287 replication=replication)
291 def cache_state(self):
292 if self.cache is None:
294 state = self.dump_state()
295 # Transform attributes for serialization.
296 for attr, value in state.items():
297 if attr == '_data_buffer':
298 state[attr] = base64.encodestring(''.join(value))
299 elif hasattr(value, 'popleft'):
300 state[attr] = list(value)
301 self.cache.save(state)
303 def report_progress(self):
304 if self.reporter is not None:
305 self.reporter(self.bytes_written, self.bytes_expected)
307 def flush_data(self):
308 start_buffer_len = self._data_buffer_len
309 start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
310 super(ArvPutCollectionWriter, self).flush_data()
311 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
312 self.bytes_written += (start_buffer_len - self._data_buffer_len)
313 self.report_progress()
314 if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
317 def _record_new_input(self, input_type, source_name, dest_name):
318 # The key needs to be a list because that's what we'll get back
319 # from JSON deserialization.
320 key = [input_type, source_name, dest_name]
321 if key in self._seen_inputs:
323 self._seen_inputs.append(key)
326 def write_file(self, source, filename=None):
327 if self._record_new_input('file', source, filename):
328 super(ArvPutCollectionWriter, self).write_file(source, filename)
330 def write_directory_tree(self,
331 path, stream_name='.', max_manifest_depth=-1):
332 if self._record_new_input('directory', path, stream_name):
333 super(ArvPutCollectionWriter, self).write_directory_tree(
334 path, stream_name, max_manifest_depth)
337 def expected_bytes_for(pathlist):
338 # Walk the given directory trees and stat files, adding up file sizes,
339 # so we can display progress as percent
341 for path in pathlist:
342 if os.path.isdir(path):
343 for filename in arvados.util.listdir_recursive(path):
344 bytesum += os.path.getsize(os.path.join(path, filename))
345 elif not os.path.isfile(path):
348 bytesum += os.path.getsize(path)
351 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
353 def machine_progress(bytes_written, bytes_expected):
354 return _machine_format.format(
355 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
357 def human_progress(bytes_written, bytes_expected):
359 return "\r{}M / {}M {:.1%} ".format(
360 bytes_written >> 20, bytes_expected >> 20,
361 float(bytes_written) / bytes_expected)
363 return "\r{} ".format(bytes_written)
365 def progress_writer(progress_func, outfile=sys.stderr):
366 def write_progress(bytes_written, bytes_expected):
367 outfile.write(progress_func(bytes_written, bytes_expected))
368 return write_progress
370 def exit_signal_handler(sigcode, frame):
373 def desired_project_uuid(api_client, project_uuid, num_retries):
375 query = api_client.users().current()
376 elif arvados.util.user_uuid_pattern.match(project_uuid):
377 query = api_client.users().get(uuid=project_uuid)
378 elif arvados.util.group_uuid_pattern.match(project_uuid):
379 query = api_client.groups().get(uuid=project_uuid)
381 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
382 return query.execute(num_retries=num_retries)['uuid']
384 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
387 args = parse_arguments(arguments)
389 if api_client is None:
390 api_client = arvados.api('v1')
392 # Determine the name to use
394 if args.stream or args.raw:
395 print >>stderr, "Cannot use --name with --stream or --raw"
397 collection_name = args.name
399 collection_name = "Saved at {} by {}@{}".format(
400 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
401 pwd.getpwuid(os.getuid()).pw_name,
402 socket.gethostname())
404 if args.project_uuid and (args.stream or args.raw):
405 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
408 # Determine the parent project
410 project_uuid = desired_project_uuid(api_client, args.project_uuid,
412 except (apiclient_errors.Error, ValueError) as error:
413 print >>stderr, error
416 # write_copies diverges from args.replication here.
417 # args.replication is how many copies we will instruct Arvados to
418 # maintain (by passing it in collections().create()) after all
419 # data is written -- and if None was given, we'll use None there.
420 # Meanwhile, write_copies is how many copies of each data block we
421 # write to Keep, which has to be a number.
423 # If we simply changed args.replication from None to a default
424 # here, we'd end up erroneously passing the default replication
425 # level (instead of None) to collections().create().
426 write_copies = (args.replication or
427 api_client._rootDesc.get('defaultCollectionReplication', 2))
430 reporter = progress_writer(human_progress)
431 elif args.batch_progress:
432 reporter = progress_writer(machine_progress)
435 bytes_expected = expected_bytes_for(args.paths)
440 resume_cache = ResumeCache(ResumeCache.make_path(args))
441 except (IOError, OSError, ValueError):
442 pass # Couldn't open cache directory/file. Continue without it.
443 except ResumeCacheConflict:
444 print >>stderr, "\n".join([
445 "arv-put: Another process is already uploading this data.",
446 " Use --no-resume if this is really what you want."])
449 if resume_cache is None:
450 writer = ArvPutCollectionWriter(
451 resume_cache, reporter, bytes_expected,
452 num_retries=args.retries,
453 replication=write_copies)
455 writer = ArvPutCollectionWriter.from_cache(
456 resume_cache, reporter, bytes_expected,
457 num_retries=args.retries,
458 replication=write_copies)
460 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
462 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
463 for sigcode in CAUGHT_SIGNALS}
465 if writer.bytes_written > 0: # We're resuming a previous upload.
466 print >>stderr, "\n".join([
467 "arv-put: Resuming previous upload from last checkpoint.",
468 " Use the --no-resume option to start over."])
470 writer.report_progress()
471 writer.do_queued_work() # Do work resumed from cache.
472 for path in args.paths: # Copy file data to Keep.
474 writer.start_new_stream()
475 writer.start_new_file(args.filename)
476 r = sys.stdin.read(64*1024)
478 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
479 # CollectionWriter.write().
480 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
481 r = sys.stdin.read(64*1024)
482 elif os.path.isdir(path):
483 writer.write_directory_tree(
484 path, max_manifest_depth=args.max_manifest_depth)
486 writer.start_new_stream()
487 writer.write_file(path, args.filename or os.path.basename(path))
488 writer.finish_current_stream()
490 if args.progress: # Print newline to split stderr from stdout for humans.
494 output = writer.manifest_text()
496 output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
498 output = ','.join(writer.data_locators())
501 manifest_text = writer.manifest_text()
503 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
504 replication_attr = 'replication_desired'
505 if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
506 # API called it 'redundancy' before #3410.
507 replication_attr = 'redundancy'
508 # Register the resulting collection in Arvados.
509 collection = api_client.collections().create(
511 'owner_uuid': project_uuid,
512 'name': collection_name,
513 'manifest_text': manifest_text,
514 replication_attr: args.replication,
516 ensure_unique_name=True
517 ).execute(num_retries=args.retries)
519 print >>stderr, "Collection saved as '%s'" % collection['name']
521 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
522 output = collection['portable_data_hash']
524 output = collection['uuid']
526 except apiclient_errors.Error as error:
528 "arv-put: Error creating Collection on project: {}.".format(
532 # Print the locator (uuid) of the new collection.
534 if not output.endswith('\n'):
537 for sigcode, orig_handler in orig_signal_handlers.items():
538 signal.signal(sigcode, orig_handler)
543 if resume_cache is not None:
544 resume_cache.destroy()
548 if __name__ == '__main__':