4 # --md5sum - display md5 of each file as read from disk
20 from apiclient import errors as apiclient_errors
22 import arvados.commands._util as arv_cmd
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
27 upload_opts = argparse.ArgumentParser(add_help=False)
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31 Local file or directory. Default: read from standard input.
34 _group = upload_opts.add_mutually_exclusive_group()
36 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38 Maximum depth of directory tree to represent in the manifest
39 structure. A directory structure deeper than this will be represented
40 as a single stream in the manifest. If N=0, the manifest will contain
41 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
42 stream per filesystem directory that contains files.
45 _group.add_argument('--normalize', action='store_true',
47 Normalize the manifest by re-ordering files and streams after writing
51 _group = upload_opts.add_mutually_exclusive_group()
53 _group.add_argument('--as-stream', action='store_true', dest='stream',
58 _group.add_argument('--stream', action='store_true',
60 Store the file content and display the resulting manifest on
61 stdout. Do not write the manifest to Keep or save a Collection object
65 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67 Synonym for --manifest.
70 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72 Synonym for --manifest.
75 _group.add_argument('--manifest', action='store_true',
77 Store the file data and resulting manifest in Keep, save a Collection
78 object in Arvados, and display the manifest locator (Collection uuid)
79 on stdout. This is the default behavior.
82 _group.add_argument('--as-raw', action='store_true', dest='raw',
87 _group.add_argument('--raw', action='store_true',
89 Store the file content and display the data block locators on stdout,
90 separated by commas, with a trailing newline. Do not store a
94 upload_opts.add_argument('--use-filename', type=str, default=None,
95 dest='filename', help="""
96 Synonym for --filename.
99 upload_opts.add_argument('--filename', type=str, default=None,
101 Use the given filename in the manifest, instead of the name of the
102 local file. This is useful when "-" or "/dev/stdin" is given as an
103 input file. It can be used only if there is exactly one path given and
104 it is not a directory. Implies --manifest.
107 upload_opts.add_argument('--portable-data-hash', action='store_true',
109 Print the portable data hash instead of the Arvados UUID for the collection
110 created by the upload.
113 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
115 Set the replication level for the new collection: how many different
116 physical storage devices (e.g., disks) should have a copy of each data
117 block. Default is to use the server-provided default (if any) or 2.
120 run_opts = argparse.ArgumentParser(add_help=False)
122 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
123 Store the collection in the specified project, instead of your Home
127 run_opts.add_argument('--name', help="""
128 Save the collection with the specified name.
131 _group = run_opts.add_mutually_exclusive_group()
132 _group.add_argument('--progress', action='store_true',
134 Display human-readable progress on stderr (bytes and, if possible,
135 percentage of total data size). This is the default behavior when
139 _group.add_argument('--no-progress', action='store_true',
141 Do not display human-readable progress on stderr, even if stderr is a
145 _group.add_argument('--batch-progress', action='store_true',
147 Display machine-readable progress on stderr (bytes and, if known,
151 _group = run_opts.add_mutually_exclusive_group()
152 _group.add_argument('--resume', action='store_true', default=True,
154 Continue interrupted uploads from cached state (default).
156 _group.add_argument('--no-resume', action='store_false', dest='resume',
158 Do not continue interrupted uploads from cached state.
161 arg_parser = argparse.ArgumentParser(
162 description='Copy data from the local filesystem to Keep.',
163 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
165 def parse_arguments(arguments):
166 args = arg_parser.parse_args(arguments)
168 if len(args.paths) == 0:
169 args.paths += ['/dev/stdin']
171 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
174 --filename argument cannot be used when storing a directory or
178 # Turn on --progress by default if stderr is a tty.
179 if (not (args.batch_progress or args.no_progress)
180 and os.isatty(sys.stderr.fileno())):
183 if args.paths == ['-']:
184 args.paths = ['/dev/stdin']
185 if not args.filename:
190 class ResumeCacheConflict(Exception):
194 class ResumeCache(object):
195 CACHE_DIR = '.cache/arvados/arv-put'
197 def __init__(self, file_spec):
198 self.cache_file = open(file_spec, 'a+')
199 self._lock_file(self.cache_file)
200 self.filename = self.cache_file.name
203 def make_path(cls, args):
205 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
206 realpaths = sorted(os.path.realpath(path) for path in args.paths)
207 md5.update('\0'.join(realpaths))
208 if any(os.path.isdir(path) for path in realpaths):
209 md5.update(str(max(args.max_manifest_depth, -1)))
211 md5.update(args.filename)
213 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
216 def _lock_file(self, fileobj):
218 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
220 raise ResumeCacheConflict("{} locked".format(fileobj.name))
223 self.cache_file.seek(0)
224 return json.load(self.cache_file)
226 def save(self, data):
228 new_cache_fd, new_cache_name = tempfile.mkstemp(
229 dir=os.path.dirname(self.filename))
230 self._lock_file(new_cache_fd)
231 new_cache = os.fdopen(new_cache_fd, 'r+')
232 json.dump(data, new_cache)
233 os.rename(new_cache_name, self.filename)
234 except (IOError, OSError, ResumeCacheConflict) as error:
236 os.unlink(new_cache_name)
237 except NameError: # mkstemp failed.
240 self.cache_file.close()
241 self.cache_file = new_cache
244 self.cache_file.close()
248 os.unlink(self.filename)
249 except OSError as error:
250 if error.errno != errno.ENOENT: # That's what we wanted anyway.
256 self.__init__(self.filename)
259 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
260 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
261 ['bytes_written', '_seen_inputs'])
263 def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
264 self.bytes_written = 0
265 self._seen_inputs = []
267 self.reporter = reporter
268 self.bytes_expected = bytes_expected
269 super(ArvPutCollectionWriter, self).__init__(**kwargs)
272 def from_cache(cls, cache, reporter=None, bytes_expected=None,
273 num_retries=0, replication=0):
276 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
277 writer = cls.from_state(state, cache, reporter, bytes_expected,
278 num_retries=num_retries,
279 replication=replication)
280 except (TypeError, ValueError,
281 arvados.errors.StaleWriterStateError) as error:
282 return cls(cache, reporter, bytes_expected, num_retries=num_retries)
286 def cache_state(self):
287 if self.cache is None:
289 state = self.dump_state()
290 # Transform attributes for serialization.
291 for attr, value in state.items():
292 if attr == '_data_buffer':
293 state[attr] = base64.encodestring(''.join(value))
294 elif hasattr(value, 'popleft'):
295 state[attr] = list(value)
296 self.cache.save(state)
298 def report_progress(self):
299 if self.reporter is not None:
300 self.reporter(self.bytes_written, self.bytes_expected)
302 def flush_data(self):
303 start_buffer_len = self._data_buffer_len
304 start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
305 super(ArvPutCollectionWriter, self).flush_data()
306 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
307 self.bytes_written += (start_buffer_len - self._data_buffer_len)
308 self.report_progress()
309 if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
312 def _record_new_input(self, input_type, source_name, dest_name):
313 # The key needs to be a list because that's what we'll get back
314 # from JSON deserialization.
315 key = [input_type, source_name, dest_name]
316 if key in self._seen_inputs:
318 self._seen_inputs.append(key)
321 def write_file(self, source, filename=None):
322 if self._record_new_input('file', source, filename):
323 super(ArvPutCollectionWriter, self).write_file(source, filename)
325 def write_directory_tree(self,
326 path, stream_name='.', max_manifest_depth=-1):
327 if self._record_new_input('directory', path, stream_name):
328 super(ArvPutCollectionWriter, self).write_directory_tree(
329 path, stream_name, max_manifest_depth)
332 def expected_bytes_for(pathlist):
333 # Walk the given directory trees and stat files, adding up file sizes,
334 # so we can display progress as percent
336 for path in pathlist:
337 if os.path.isdir(path):
338 for filename in arvados.util.listdir_recursive(path):
339 bytesum += os.path.getsize(os.path.join(path, filename))
340 elif not os.path.isfile(path):
343 bytesum += os.path.getsize(path)
346 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
348 def machine_progress(bytes_written, bytes_expected):
349 return _machine_format.format(
350 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
352 def human_progress(bytes_written, bytes_expected):
354 return "\r{}M / {}M {:.1%} ".format(
355 bytes_written >> 20, bytes_expected >> 20,
356 float(bytes_written) / bytes_expected)
358 return "\r{} ".format(bytes_written)
360 def progress_writer(progress_func, outfile=sys.stderr):
361 def write_progress(bytes_written, bytes_expected):
362 outfile.write(progress_func(bytes_written, bytes_expected))
363 return write_progress
365 def exit_signal_handler(sigcode, frame):
368 def desired_project_uuid(api_client, project_uuid, num_retries):
370 query = api_client.users().current()
371 elif arvados.util.user_uuid_pattern.match(project_uuid):
372 query = api_client.users().get(uuid=project_uuid)
373 elif arvados.util.group_uuid_pattern.match(project_uuid):
374 query = api_client.groups().get(uuid=project_uuid)
376 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
377 return query.execute(num_retries=num_retries)['uuid']
379 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
382 args = parse_arguments(arguments)
384 if api_client is None:
385 api_client = arvados.api('v1')
387 # Determine the name to use
389 if args.stream or args.raw:
390 print >>stderr, "Cannot use --name with --stream or --raw"
392 collection_name = args.name
394 collection_name = "Saved at {} by {}@{}".format(
395 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
396 pwd.getpwuid(os.getuid()).pw_name,
397 socket.gethostname())
399 if args.project_uuid and (args.stream or args.raw):
400 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
403 # Determine the parent project
405 project_uuid = desired_project_uuid(api_client, args.project_uuid,
407 except (apiclient_errors.Error, ValueError) as error:
408 print >>stderr, error
411 # write_copies diverges from args.replication here.
412 # args.replication is how many copies we will instruct Arvados to
413 # maintain (by passing it in collections().create()) after all
414 # data is written -- and if None was given, we'll use None there.
415 # Meanwhile, write_copies is how many copies of each data block we
416 # write to Keep, which has to be a number.
418 # If we simply changed args.replication from None to a default
419 # here, we'd end up erroneously passing the default replication
420 # level (instead of None) to collections().create().
421 write_copies = (args.replication or
422 api_client._rootDesc.get('defaultCollectionReplication', 2))
425 reporter = progress_writer(human_progress)
426 elif args.batch_progress:
427 reporter = progress_writer(machine_progress)
430 bytes_expected = expected_bytes_for(args.paths)
435 resume_cache = ResumeCache(ResumeCache.make_path(args))
436 except (IOError, OSError, ValueError):
437 pass # Couldn't open cache directory/file. Continue without it.
438 except ResumeCacheConflict:
439 print >>stderr, "\n".join([
440 "arv-put: Another process is already uploading this data.",
441 " Use --no-resume if this is really what you want."])
444 if resume_cache is None:
445 writer = ArvPutCollectionWriter(
446 resume_cache, reporter, bytes_expected,
447 num_retries=args.retries,
448 replication=write_copies)
450 writer = ArvPutCollectionWriter.from_cache(
451 resume_cache, reporter, bytes_expected,
452 num_retries=args.retries,
453 replication=write_copies)
455 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
457 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
458 for sigcode in CAUGHT_SIGNALS}
460 if writer.bytes_written > 0: # We're resuming a previous upload.
461 print >>stderr, "\n".join([
462 "arv-put: Resuming previous upload from last checkpoint.",
463 " Use the --no-resume option to start over."])
465 writer.report_progress()
466 writer.do_queued_work() # Do work resumed from cache.
467 for path in args.paths: # Copy file data to Keep.
468 if os.path.isdir(path):
469 writer.write_directory_tree(
470 path, max_manifest_depth=args.max_manifest_depth)
472 writer.start_new_stream()
473 writer.write_file(path, args.filename or os.path.basename(path))
474 writer.finish_current_stream()
476 if args.progress: # Print newline to split stderr from stdout for humans.
480 output = writer.manifest_text()
482 output = CollectionReader(output).manifest_text(normalize=True)
484 output = ','.join(writer.data_locators())
487 manifest_text = writer.manifest_text()
489 manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
490 replication_attr = 'replication_desired'
491 if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
492 # API called it 'redundancy' before #3410.
493 replication_attr = 'redundancy'
494 # Register the resulting collection in Arvados.
495 collection = api_client.collections().create(
497 'owner_uuid': project_uuid,
498 'name': collection_name,
499 'manifest_text': manifest_text,
500 replication_attr: args.replication,
502 ensure_unique_name=True
503 ).execute(num_retries=args.retries)
505 print >>stderr, "Collection saved as '%s'" % collection['name']
507 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
508 output = collection['portable_data_hash']
510 output = collection['uuid']
512 except apiclient_errors.Error as error:
514 "arv-put: Error creating Collection on project: {}.".format(
518 # Print the locator (uuid) of the new collection.
520 if not output.endswith('\n'):
523 for sigcode, orig_handler in orig_signal_handlers.items():
524 signal.signal(sigcode, orig_handler)
529 if resume_cache is not None:
530 resume_cache.destroy()
534 if __name__ == '__main__':