4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
21 from apiclient import errors as apiclient_errors
23 import arvados.commands._util as arv_cmd
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
28 upload_opts = argparse.ArgumentParser(add_help=False)
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
32 Local file or directory. Default: read from standard input.
35 _group = upload_opts.add_mutually_exclusive_group()
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
46 _group.add_argument('--normalize', action='store_true',
48 Normalize the manifest by re-ordering files and streams after writing
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
59 _group.add_argument('--stream', action='store_true',
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68 Synonym for --manifest.
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73 Synonym for --manifest.
76 _group.add_argument('--manifest', action='store_true',
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
88 _group.add_argument('--raw', action='store_true',
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96 dest='filename', help="""
97 Synonym for --filename.
100 upload_opts.add_argument('--filename', type=str, default=None,
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
121 run_opts = argparse.ArgumentParser(add_help=False)
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
140 _group.add_argument('--no-progress', action='store_true',
142 Do not display human-readable progress on stderr, even if stderr is a
146 _group.add_argument('--batch-progress', action='store_true',
148 Display machine-readable progress on stderr (bytes and, if known,
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
155 Continue interrupted uploads from cached state (default).
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
159 Do not continue interrupted uploads from cached state.
162 arg_parser = argparse.ArgumentParser(
163 description='Copy data from the local filesystem to Keep.',
164 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
166 def parse_arguments(arguments):
167 args = arg_parser.parse_args(arguments)
169 if len(args.paths) == 0:
172 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
174 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
177 --filename argument cannot be used when storing a directory or
181 # Turn on --progress by default if stderr is a tty.
182 if (not (args.batch_progress or args.no_progress)
183 and os.isatty(sys.stderr.fileno())):
186 if args.paths == ['-']:
188 if not args.filename:
189 args.filename = 'stdin'
193 class ResumeCacheConflict(Exception):
197 class ResumeCache(object):
198 CACHE_DIR = '.cache/arvados/arv-put'
200 def __init__(self, file_spec):
201 self.cache_file = open(file_spec, 'a+')
202 self._lock_file(self.cache_file)
203 self.filename = self.cache_file.name
206 def make_path(cls, args):
208 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209 realpaths = sorted(os.path.realpath(path) for path in args.paths)
210 md5.update('\0'.join(realpaths))
211 if any(os.path.isdir(path) for path in realpaths):
212 md5.update(str(max(args.max_manifest_depth, -1)))
214 md5.update(args.filename)
216 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
219 def _lock_file(self, fileobj):
221 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
223 raise ResumeCacheConflict("{} locked".format(fileobj.name))
226 self.cache_file.seek(0)
227 return json.load(self.cache_file)
229 def check_cache(self, api_client=None, num_retries=0):
234 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
235 locator = state["_finished_streams"][0][1][0]
236 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
237 locator = state["_current_stream_locators"][0]
238 if locator is not None:
239 kc = arvados.keep.KeepClient(api_client=api_client)
240 kc.head(locator, num_retries=num_retries)
241 except Exception as e:
246 def save(self, data):
248 new_cache_fd, new_cache_name = tempfile.mkstemp(
249 dir=os.path.dirname(self.filename))
250 self._lock_file(new_cache_fd)
251 new_cache = os.fdopen(new_cache_fd, 'r+')
252 json.dump(data, new_cache)
253 os.rename(new_cache_name, self.filename)
254 except (IOError, OSError, ResumeCacheConflict) as error:
256 os.unlink(new_cache_name)
257 except NameError: # mkstemp failed.
260 self.cache_file.close()
261 self.cache_file = new_cache
264 self.cache_file.close()
268 os.unlink(self.filename)
269 except OSError as error:
270 if error.errno != errno.ENOENT: # That's what we wanted anyway.
276 self.__init__(self.filename)
279 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
280 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
281 ['bytes_written', '_seen_inputs'])
283 def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
284 self.bytes_written = 0
285 self._seen_inputs = []
287 self.reporter = reporter
288 self.bytes_expected = bytes_expected
289 super(ArvPutCollectionWriter, self).__init__(**kwargs)
292 def from_cache(cls, cache, reporter=None, bytes_expected=None,
293 num_retries=0, replication=0):
296 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
297 writer = cls.from_state(state, cache, reporter, bytes_expected,
298 num_retries=num_retries,
299 replication=replication)
300 except (TypeError, ValueError,
301 arvados.errors.StaleWriterStateError) as error:
302 return cls(cache, reporter, bytes_expected,
303 num_retries=num_retries,
304 replication=replication)
308 def cache_state(self):
309 if self.cache is None:
311 state = self.dump_state()
312 # Transform attributes for serialization.
313 for attr, value in state.items():
314 if attr == '_data_buffer':
315 state[attr] = base64.encodestring(''.join(value))
316 elif hasattr(value, 'popleft'):
317 state[attr] = list(value)
318 self.cache.save(state)
320 def report_progress(self):
321 if self.reporter is not None:
322 self.reporter(self.bytes_written, self.bytes_expected)
324 def flush_data(self):
325 start_buffer_len = self._data_buffer_len
326 start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
327 super(ArvPutCollectionWriter, self).flush_data()
328 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
329 self.bytes_written += (start_buffer_len - self._data_buffer_len)
330 self.report_progress()
331 if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
334 def _record_new_input(self, input_type, source_name, dest_name):
335 # The key needs to be a list because that's what we'll get back
336 # from JSON deserialization.
337 key = [input_type, source_name, dest_name]
338 if key in self._seen_inputs:
340 self._seen_inputs.append(key)
343 def write_file(self, source, filename=None):
344 if self._record_new_input('file', source, filename):
345 super(ArvPutCollectionWriter, self).write_file(source, filename)
347 def write_directory_tree(self,
348 path, stream_name='.', max_manifest_depth=-1):
349 if self._record_new_input('directory', path, stream_name):
350 super(ArvPutCollectionWriter, self).write_directory_tree(
351 path, stream_name, max_manifest_depth)
354 def expected_bytes_for(pathlist):
355 # Walk the given directory trees and stat files, adding up file sizes,
356 # so we can display progress as percent
358 for path in pathlist:
359 if os.path.isdir(path):
360 for filename in arvados.util.listdir_recursive(path):
361 bytesum += os.path.getsize(os.path.join(path, filename))
362 elif not os.path.isfile(path):
365 bytesum += os.path.getsize(path)
368 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
370 def machine_progress(bytes_written, bytes_expected):
371 return _machine_format.format(
372 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
374 def human_progress(bytes_written, bytes_expected):
376 return "\r{}M / {}M {:.1%} ".format(
377 bytes_written >> 20, bytes_expected >> 20,
378 float(bytes_written) / bytes_expected)
380 return "\r{} ".format(bytes_written)
382 def progress_writer(progress_func, outfile=sys.stderr):
383 def write_progress(bytes_written, bytes_expected):
384 outfile.write(progress_func(bytes_written, bytes_expected))
385 return write_progress
387 def exit_signal_handler(sigcode, frame):
390 def desired_project_uuid(api_client, project_uuid, num_retries):
392 query = api_client.users().current()
393 elif arvados.util.user_uuid_pattern.match(project_uuid):
394 query = api_client.users().get(uuid=project_uuid)
395 elif arvados.util.group_uuid_pattern.match(project_uuid):
396 query = api_client.groups().get(uuid=project_uuid)
398 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
399 return query.execute(num_retries=num_retries)['uuid']
401 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
404 args = parse_arguments(arguments)
406 if api_client is None:
407 api_client = arvados.api('v1')
409 # Determine the name to use
411 if args.stream or args.raw:
412 print >>stderr, "Cannot use --name with --stream or --raw"
414 collection_name = args.name
416 collection_name = "Saved at {} by {}@{}".format(
417 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
418 pwd.getpwuid(os.getuid()).pw_name,
419 socket.gethostname())
421 if args.project_uuid and (args.stream or args.raw):
422 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
425 # Determine the parent project
427 project_uuid = desired_project_uuid(api_client, args.project_uuid,
429 except (apiclient_errors.Error, ValueError) as error:
430 print >>stderr, error
433 # write_copies diverges from args.replication here.
434 # args.replication is how many copies we will instruct Arvados to
435 # maintain (by passing it in collections().create()) after all
436 # data is written -- and if None was given, we'll use None there.
437 # Meanwhile, write_copies is how many copies of each data block we
438 # write to Keep, which has to be a number.
440 # If we simply changed args.replication from None to a default
441 # here, we'd end up erroneously passing the default replication
442 # level (instead of None) to collections().create().
443 write_copies = (args.replication or
444 api_client._rootDesc.get('defaultCollectionReplication', 2))
447 reporter = progress_writer(human_progress)
448 elif args.batch_progress:
449 reporter = progress_writer(machine_progress)
452 bytes_expected = expected_bytes_for(args.paths)
457 resume_cache = ResumeCache(ResumeCache.make_path(args))
458 resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
459 except (IOError, OSError, ValueError):
460 pass # Couldn't open cache directory/file. Continue without it.
461 except ResumeCacheConflict:
462 print >>stderr, "\n".join([
463 "arv-put: Another process is already uploading this data.",
464 " Use --no-resume if this is really what you want."])
467 if resume_cache is None:
468 writer = ArvPutCollectionWriter(
469 resume_cache, reporter, bytes_expected,
470 num_retries=args.retries,
471 replication=write_copies)
473 writer = ArvPutCollectionWriter.from_cache(
474 resume_cache, reporter, bytes_expected,
475 num_retries=args.retries,
476 replication=write_copies)
478 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
480 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
481 for sigcode in CAUGHT_SIGNALS}
483 if writer.bytes_written > 0: # We're resuming a previous upload.
484 print >>stderr, "\n".join([
485 "arv-put: Resuming previous upload from last checkpoint.",
486 " Use the --no-resume option to start over."])
488 writer.report_progress()
489 writer.do_queued_work() # Do work resumed from cache.
490 for path in args.paths: # Copy file data to Keep.
492 writer.start_new_stream()
493 writer.start_new_file(args.filename)
494 r = sys.stdin.read(64*1024)
496 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
497 # CollectionWriter.write().
498 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
499 r = sys.stdin.read(64*1024)
500 elif os.path.isdir(path):
501 writer.write_directory_tree(
502 path, max_manifest_depth=args.max_manifest_depth)
504 writer.start_new_stream()
505 writer.write_file(path, args.filename or os.path.basename(path))
506 writer.finish_current_stream()
508 if args.progress: # Print newline to split stderr from stdout for humans.
513 output = writer.manifest_text()
515 output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
517 output = ','.join(writer.data_locators())
520 manifest_text = writer.manifest_text()
522 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
523 replication_attr = 'replication_desired'
524 if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
525 # API called it 'redundancy' before #3410.
526 replication_attr = 'redundancy'
527 # Register the resulting collection in Arvados.
528 collection = api_client.collections().create(
530 'owner_uuid': project_uuid,
531 'name': collection_name,
532 'manifest_text': manifest_text,
533 replication_attr: args.replication,
535 ensure_unique_name=True
536 ).execute(num_retries=args.retries)
538 print >>stderr, "Collection saved as '%s'" % collection['name']
540 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
541 output = collection['portable_data_hash']
543 output = collection['uuid']
545 except apiclient_errors.Error as error:
547 "arv-put: Error creating Collection on project: {}.".format(
551 # Print the locator (uuid) of the new collection.
556 if not output.endswith('\n'):
559 for sigcode, orig_handler in orig_signal_handlers.items():
560 signal.signal(sigcode, orig_handler)
565 if resume_cache is not None:
566 resume_cache.destroy()
570 if __name__ == '__main__':