4 # --md5sum - display md5 of each file as read from disk
8 import arvados.collection
21 from apiclient import errors as apiclient_errors
23 import arvados.commands._util as arv_cmd
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
28 upload_opts = argparse.ArgumentParser(add_help=False)
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
32 Local file or directory. Default: read from standard input.
35 _group = upload_opts.add_mutually_exclusive_group()
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
46 _group.add_argument('--normalize', action='store_true',
48 Normalize the manifest by re-ordering files and streams after writing
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
59 _group.add_argument('--stream', action='store_true',
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68 Synonym for --manifest.
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73 Synonym for --manifest.
76 _group.add_argument('--manifest', action='store_true',
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
88 _group.add_argument('--raw', action='store_true',
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96 dest='filename', help="""
97 Synonym for --filename.
100 upload_opts.add_argument('--filename', type=str, default=None,
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
121 run_opts = argparse.ArgumentParser(add_help=False)
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
140 _group.add_argument('--no-progress', action='store_true',
142 Do not display human-readable progress on stderr, even if stderr is a
146 _group.add_argument('--batch-progress', action='store_true',
148 Display machine-readable progress on stderr (bytes and, if known,
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
155 Continue interrupted uploads from cached state (default).
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
159 Do not continue interrupted uploads from cached state.
162 arg_parser = argparse.ArgumentParser(
163 description='Copy data from the local filesystem to Keep.',
164 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
166 def parse_arguments(arguments):
167 args = arg_parser.parse_args(arguments)
169 if len(args.paths) == 0:
172 args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
174 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
177 --filename argument cannot be used when storing a directory or
181 # Turn on --progress by default if stderr is a tty.
182 if (not (args.batch_progress or args.no_progress)
183 and os.isatty(sys.stderr.fileno())):
186 if args.paths == ['-']:
188 if not args.filename:
189 args.filename = 'stdin'
193 class ResumeCacheConflict(Exception):
197 class ResumeCache(object):
198 CACHE_DIR = '.cache/arvados/arv-put'
200 def __init__(self, file_spec):
201 self.cache_file = open(file_spec, 'a+')
202 self._lock_file(self.cache_file)
203 self.filename = self.cache_file.name
206 def make_path(cls, args):
208 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209 realpaths = sorted(os.path.realpath(path) for path in args.paths)
210 md5.update('\0'.join(realpaths))
211 if any(os.path.isdir(path) for path in realpaths):
212 md5.update(str(max(args.max_manifest_depth, -1)))
214 md5.update(args.filename)
216 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
219 def _lock_file(self, fileobj):
221 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
223 raise ResumeCacheConflict("{} locked".format(fileobj.name))
226 self.cache_file.seek(0)
227 return json.load(self.cache_file)
229 def save(self, data):
231 new_cache_fd, new_cache_name = tempfile.mkstemp(
232 dir=os.path.dirname(self.filename))
233 self._lock_file(new_cache_fd)
234 new_cache = os.fdopen(new_cache_fd, 'r+')
235 json.dump(data, new_cache)
236 os.rename(new_cache_name, self.filename)
237 except (IOError, OSError, ResumeCacheConflict) as error:
239 os.unlink(new_cache_name)
240 except NameError: # mkstemp failed.
243 self.cache_file.close()
244 self.cache_file = new_cache
247 self.cache_file.close()
251 os.unlink(self.filename)
252 except OSError as error:
253 if error.errno != errno.ENOENT: # That's what we wanted anyway.
259 self.__init__(self.filename)
262 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
263 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
264 ['bytes_written', '_seen_inputs'])
266 def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
267 self.bytes_written = 0
268 self._seen_inputs = []
270 self.reporter = reporter
271 self.bytes_expected = bytes_expected
272 super(ArvPutCollectionWriter, self).__init__(**kwargs)
275 def from_cache(cls, cache, reporter=None, bytes_expected=None,
276 num_retries=0, replication=0):
279 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
280 writer = cls.from_state(state, cache, reporter, bytes_expected,
281 num_retries=num_retries,
282 replication=replication)
283 except (TypeError, ValueError,
284 arvados.errors.StaleWriterStateError) as error:
285 return cls(cache, reporter, bytes_expected, num_retries=num_retries)
289 def cache_state(self):
290 if self.cache is None:
292 state = self.dump_state()
293 # Transform attributes for serialization.
294 for attr, value in state.items():
295 if attr == '_data_buffer':
296 state[attr] = base64.encodestring(''.join(value))
297 elif hasattr(value, 'popleft'):
298 state[attr] = list(value)
299 self.cache.save(state)
301 def report_progress(self):
302 if self.reporter is not None:
303 self.reporter(self.bytes_written, self.bytes_expected)
305 def flush_data(self):
306 start_buffer_len = self._data_buffer_len
307 start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
308 super(ArvPutCollectionWriter, self).flush_data()
309 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
310 self.bytes_written += (start_buffer_len - self._data_buffer_len)
311 self.report_progress()
312 if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
315 def _record_new_input(self, input_type, source_name, dest_name):
316 # The key needs to be a list because that's what we'll get back
317 # from JSON deserialization.
318 key = [input_type, source_name, dest_name]
319 if key in self._seen_inputs:
321 self._seen_inputs.append(key)
324 def write_file(self, source, filename=None):
325 if self._record_new_input('file', source, filename):
326 super(ArvPutCollectionWriter, self).write_file(source, filename)
328 def write_directory_tree(self,
329 path, stream_name='.', max_manifest_depth=-1):
330 if self._record_new_input('directory', path, stream_name):
331 super(ArvPutCollectionWriter, self).write_directory_tree(
332 path, stream_name, max_manifest_depth)
335 def expected_bytes_for(pathlist):
336 # Walk the given directory trees and stat files, adding up file sizes,
337 # so we can display progress as percent
339 for path in pathlist:
340 if os.path.isdir(path):
341 for filename in arvados.util.listdir_recursive(path):
342 bytesum += os.path.getsize(os.path.join(path, filename))
343 elif not os.path.isfile(path):
346 bytesum += os.path.getsize(path)
349 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
351 def machine_progress(bytes_written, bytes_expected):
352 return _machine_format.format(
353 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
355 def human_progress(bytes_written, bytes_expected):
357 return "\r{}M / {}M {:.1%} ".format(
358 bytes_written >> 20, bytes_expected >> 20,
359 float(bytes_written) / bytes_expected)
361 return "\r{} ".format(bytes_written)
363 def progress_writer(progress_func, outfile=sys.stderr):
364 def write_progress(bytes_written, bytes_expected):
365 outfile.write(progress_func(bytes_written, bytes_expected))
366 return write_progress
368 def exit_signal_handler(sigcode, frame):
371 def desired_project_uuid(api_client, project_uuid, num_retries):
373 query = api_client.users().current()
374 elif arvados.util.user_uuid_pattern.match(project_uuid):
375 query = api_client.users().get(uuid=project_uuid)
376 elif arvados.util.group_uuid_pattern.match(project_uuid):
377 query = api_client.groups().get(uuid=project_uuid)
379 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
380 return query.execute(num_retries=num_retries)['uuid']
382 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
385 args = parse_arguments(arguments)
387 if api_client is None:
388 api_client = arvados.api('v1')
390 # Determine the name to use
392 if args.stream or args.raw:
393 print >>stderr, "Cannot use --name with --stream or --raw"
395 collection_name = args.name
397 collection_name = "Saved at {} by {}@{}".format(
398 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
399 pwd.getpwuid(os.getuid()).pw_name,
400 socket.gethostname())
402 if args.project_uuid and (args.stream or args.raw):
403 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
406 # Determine the parent project
408 project_uuid = desired_project_uuid(api_client, args.project_uuid,
410 except (apiclient_errors.Error, ValueError) as error:
411 print >>stderr, error
414 # write_copies diverges from args.replication here.
415 # args.replication is how many copies we will instruct Arvados to
416 # maintain (by passing it in collections().create()) after all
417 # data is written -- and if None was given, we'll use None there.
418 # Meanwhile, write_copies is how many copies of each data block we
419 # write to Keep, which has to be a number.
421 # If we simply changed args.replication from None to a default
422 # here, we'd end up erroneously passing the default replication
423 # level (instead of None) to collections().create().
424 write_copies = (args.replication or
425 api_client._rootDesc.get('defaultCollectionReplication', 2))
428 reporter = progress_writer(human_progress)
429 elif args.batch_progress:
430 reporter = progress_writer(machine_progress)
433 bytes_expected = expected_bytes_for(args.paths)
438 resume_cache = ResumeCache(ResumeCache.make_path(args))
439 except (IOError, OSError, ValueError):
440 pass # Couldn't open cache directory/file. Continue without it.
441 except ResumeCacheConflict:
442 print >>stderr, "\n".join([
443 "arv-put: Another process is already uploading this data.",
444 " Use --no-resume if this is really what you want."])
447 if resume_cache is None:
448 writer = ArvPutCollectionWriter(
449 resume_cache, reporter, bytes_expected,
450 num_retries=args.retries,
451 replication=write_copies)
453 writer = ArvPutCollectionWriter.from_cache(
454 resume_cache, reporter, bytes_expected,
455 num_retries=args.retries,
456 replication=write_copies)
458 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
460 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
461 for sigcode in CAUGHT_SIGNALS}
463 if writer.bytes_written > 0: # We're resuming a previous upload.
464 print >>stderr, "\n".join([
465 "arv-put: Resuming previous upload from last checkpoint.",
466 " Use the --no-resume option to start over."])
468 writer.report_progress()
469 writer.do_queued_work() # Do work resumed from cache.
470 for path in args.paths: # Copy file data to Keep.
472 writer.start_new_stream()
473 writer.start_new_file(args.filename)
474 r = sys.stdin.read(64*1024)
476 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
477 # CollectionWriter.write().
478 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
479 r = sys.stdin.read(64*1024)
480 elif os.path.isdir(path):
481 writer.write_directory_tree(
482 path, max_manifest_depth=args.max_manifest_depth)
484 writer.start_new_stream()
485 writer.write_file(path, args.filename or os.path.basename(path))
486 writer.finish_current_stream()
488 if args.progress: # Print newline to split stderr from stdout for humans.
492 output = writer.manifest_text()
494 output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
496 output = ','.join(writer.data_locators())
499 manifest_text = writer.manifest_text()
501 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
502 replication_attr = 'replication_desired'
503 if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
504 # API called it 'redundancy' before #3410.
505 replication_attr = 'redundancy'
506 # Register the resulting collection in Arvados.
507 collection = api_client.collections().create(
509 'owner_uuid': project_uuid,
510 'name': collection_name,
511 'manifest_text': manifest_text,
512 replication_attr: args.replication,
514 ensure_unique_name=True
515 ).execute(num_retries=args.retries)
517 print >>stderr, "Collection saved as '%s'" % collection['name']
519 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
520 output = collection['portable_data_hash']
522 output = collection['uuid']
524 except apiclient_errors.Error as error:
526 "arv-put: Error creating Collection on project: {}.".format(
530 # Print the locator (uuid) of the new collection.
532 if not output.endswith('\n'):
535 for sigcode, orig_handler in orig_signal_handlers.items():
536 signal.signal(sigcode, orig_handler)
541 if resume_cache is not None:
542 resume_cache.destroy()
546 if __name__ == '__main__':