4 # --md5sum - display md5 of each file as read from disk
6 import apiclient.errors
22 import arvados.commands._util as arv_cmd
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
27 upload_opts = argparse.ArgumentParser(add_help=False)
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31 Local file or directory. Default: read from standard input.
34 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
36 Maximum depth of directory tree to represent in the manifest
37 structure. A directory structure deeper than this will be represented
38 as a single stream in the manifest. If N=0, the manifest will contain
39 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
40 stream per filesystem directory that contains files.
43 _group = upload_opts.add_mutually_exclusive_group()
45 _group.add_argument('--as-stream', action='store_true', dest='stream',
50 _group.add_argument('--stream', action='store_true',
52 Store the file content and display the resulting manifest on
53 stdout. Do not write the manifest to Keep or save a Collection object
57 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
59 Synonym for --manifest.
62 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
64 Synonym for --manifest.
67 _group.add_argument('--manifest', action='store_true',
69 Store the file data and resulting manifest in Keep, save a Collection
70 object in Arvados, and display the manifest locator (Collection uuid)
71 on stdout. This is the default behavior.
74 _group.add_argument('--as-raw', action='store_true', dest='raw',
79 _group.add_argument('--raw', action='store_true',
81 Store the file content and display the data block locators on stdout,
82 separated by commas, with a trailing newline. Do not store a
86 upload_opts.add_argument('--use-filename', type=str, default=None,
87 dest='filename', help="""
88 Synonym for --filename.
91 upload_opts.add_argument('--filename', type=str, default=None,
93 Use the given filename in the manifest, instead of the name of the
94 local file. This is useful when "-" or "/dev/stdin" is given as an
95 input file. It can be used only if there is exactly one path given and
96 it is not a directory. Implies --manifest.
99 upload_opts.add_argument('--portable-data-hash', action='store_true',
101 Print the portable data hash instead of the Arvados UUID for the collection
102 created by the upload.
105 run_opts = argparse.ArgumentParser(add_help=False)
107 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
108 Store the collection in the specified project, instead of your Home
112 run_opts.add_argument('--name', help="""
113 Save the collection with the specified name.
116 _group = run_opts.add_mutually_exclusive_group()
117 _group.add_argument('--progress', action='store_true',
119 Display human-readable progress on stderr (bytes and, if possible,
120 percentage of total data size). This is the default behavior when
124 _group.add_argument('--no-progress', action='store_true',
126 Do not display human-readable progress on stderr, even if stderr is a
130 _group.add_argument('--batch-progress', action='store_true',
132 Display machine-readable progress on stderr (bytes and, if known,
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--resume', action='store_true', default=True,
139 Continue interrupted uploads from cached state (default).
141 _group.add_argument('--no-resume', action='store_false', dest='resume',
143 Do not continue interrupted uploads from cached state.
146 arg_parser = argparse.ArgumentParser(
147 description='Copy data from the local filesystem to Keep.',
148 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
150 def parse_arguments(arguments):
151 args = arg_parser.parse_args(arguments)
153 if len(args.paths) == 0:
154 args.paths += ['/dev/stdin']
156 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
159 --filename argument cannot be used when storing a directory or
163 # Turn on --progress by default if stderr is a tty.
164 if (not (args.batch_progress or args.no_progress)
165 and os.isatty(sys.stderr.fileno())):
168 if args.paths == ['-']:
169 args.paths = ['/dev/stdin']
170 if not args.filename:
175 class ResumeCacheConflict(Exception):
179 class ResumeCache(object):
180 CACHE_DIR = '.cache/arvados/arv-put'
182 def __init__(self, file_spec):
183 self.cache_file = open(file_spec, 'a+')
184 self._lock_file(self.cache_file)
185 self.filename = self.cache_file.name
188 def make_path(cls, args):
190 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
191 realpaths = sorted(os.path.realpath(path) for path in args.paths)
192 md5.update('\0'.join(realpaths))
193 if any(os.path.isdir(path) for path in realpaths):
194 md5.update(str(max(args.max_manifest_depth, -1)))
196 md5.update(args.filename)
198 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
201 def _lock_file(self, fileobj):
203 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
205 raise ResumeCacheConflict("{} locked".format(fileobj.name))
208 self.cache_file.seek(0)
209 return json.load(self.cache_file)
211 def save(self, data):
213 new_cache_fd, new_cache_name = tempfile.mkstemp(
214 dir=os.path.dirname(self.filename))
215 self._lock_file(new_cache_fd)
216 new_cache = os.fdopen(new_cache_fd, 'r+')
217 json.dump(data, new_cache)
218 os.rename(new_cache_name, self.filename)
219 except (IOError, OSError, ResumeCacheConflict) as error:
221 os.unlink(new_cache_name)
222 except NameError: # mkstemp failed.
225 self.cache_file.close()
226 self.cache_file = new_cache
229 self.cache_file.close()
233 os.unlink(self.filename)
234 except OSError as error:
235 if error.errno != errno.ENOENT: # That's what we wanted anyway.
241 self.__init__(self.filename)
244 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
245 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
246 ['bytes_written', '_seen_inputs'])
248 def __init__(self, cache=None, reporter=None, bytes_expected=None,
249 api_client=None, num_retries=0):
250 self.bytes_written = 0
251 self._seen_inputs = []
253 self.reporter = reporter
254 self.bytes_expected = bytes_expected
255 super(ArvPutCollectionWriter, self).__init__(
256 api_client, num_retries=num_retries)
259 def from_cache(cls, cache, reporter=None, bytes_expected=None,
263 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
264 writer = cls.from_state(state, cache, reporter, bytes_expected,
265 num_retries=num_retries)
266 except (TypeError, ValueError,
267 arvados.errors.StaleWriterStateError) as error:
268 return cls(cache, reporter, bytes_expected, num_retries=num_retries)
272 def cache_state(self):
273 if self.cache is None:
275 state = self.dump_state()
276 # Transform attributes for serialization.
277 for attr, value in state.items():
278 if attr == '_data_buffer':
279 state[attr] = base64.encodestring(''.join(value))
280 elif hasattr(value, 'popleft'):
281 state[attr] = list(value)
282 self.cache.save(state)
284 def report_progress(self):
285 if self.reporter is not None:
286 self.reporter(self.bytes_written, self.bytes_expected)
288 def flush_data(self):
289 start_buffer_len = self._data_buffer_len
290 start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
291 super(ArvPutCollectionWriter, self).flush_data()
292 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
293 self.bytes_written += (start_buffer_len - self._data_buffer_len)
294 self.report_progress()
295 if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
298 def _record_new_input(self, input_type, source_name, dest_name):
299 # The key needs to be a list because that's what we'll get back
300 # from JSON deserialization.
301 key = [input_type, source_name, dest_name]
302 if key in self._seen_inputs:
304 self._seen_inputs.append(key)
307 def write_file(self, source, filename=None):
308 if self._record_new_input('file', source, filename):
309 super(ArvPutCollectionWriter, self).write_file(source, filename)
311 def write_directory_tree(self,
312 path, stream_name='.', max_manifest_depth=-1):
313 if self._record_new_input('directory', path, stream_name):
314 super(ArvPutCollectionWriter, self).write_directory_tree(
315 path, stream_name, max_manifest_depth)
318 def expected_bytes_for(pathlist):
319 # Walk the given directory trees and stat files, adding up file sizes,
320 # so we can display progress as percent
322 for path in pathlist:
323 if os.path.isdir(path):
324 for filename in arvados.util.listdir_recursive(path):
325 bytesum += os.path.getsize(os.path.join(path, filename))
326 elif not os.path.isfile(path):
329 bytesum += os.path.getsize(path)
332 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
334 def machine_progress(bytes_written, bytes_expected):
335 return _machine_format.format(
336 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
338 def human_progress(bytes_written, bytes_expected):
340 return "\r{}M / {}M {:.1%} ".format(
341 bytes_written >> 20, bytes_expected >> 20,
342 float(bytes_written) / bytes_expected)
344 return "\r{} ".format(bytes_written)
346 def progress_writer(progress_func, outfile=sys.stderr):
347 def write_progress(bytes_written, bytes_expected):
348 outfile.write(progress_func(bytes_written, bytes_expected))
349 return write_progress
351 def exit_signal_handler(sigcode, frame):
354 def desired_project_uuid(api_client, project_uuid, num_retries):
356 query = api_client.users().current()
357 elif arvados.util.user_uuid_pattern.match(project_uuid):
358 query = api_client.users().get(uuid=project_uuid)
359 elif arvados.util.group_uuid_pattern.match(project_uuid):
360 query = api_client.groups().get(uuid=project_uuid)
362 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
363 return query.execute(num_retries=num_retries)['uuid']
365 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
368 args = parse_arguments(arguments)
370 if api_client is None:
371 api_client = arvados.api('v1')
373 # Determine the name to use
375 if args.stream or args.raw:
376 print >>stderr, "Cannot use --name with --stream or --raw"
378 collection_name = args.name
380 collection_name = "Saved at {} by {}@{}".format(
381 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
382 pwd.getpwuid(os.getuid()).pw_name,
383 socket.gethostname())
385 if args.project_uuid and (args.stream or args.raw):
386 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
389 # Determine the parent project
391 project_uuid = desired_project_uuid(api_client, args.project_uuid,
393 except (apiclient.errors.Error, ValueError) as error:
394 print >>stderr, error
398 reporter = progress_writer(human_progress)
399 elif args.batch_progress:
400 reporter = progress_writer(machine_progress)
403 bytes_expected = expected_bytes_for(args.paths)
408 resume_cache = ResumeCache(ResumeCache.make_path(args))
409 except (IOError, OSError, ValueError):
410 pass # Couldn't open cache directory/file. Continue without it.
411 except ResumeCacheConflict:
412 print >>stderr, "\n".join([
413 "arv-put: Another process is already uploading this data.",
414 " Use --no-resume if this is really what you want."])
417 if resume_cache is None:
418 writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
419 num_retries=args.retries)
421 writer = ArvPutCollectionWriter.from_cache(
422 resume_cache, reporter, bytes_expected, num_retries=args.retries)
424 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
426 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
427 for sigcode in CAUGHT_SIGNALS}
429 if writer.bytes_written > 0: # We're resuming a previous upload.
430 print >>stderr, "\n".join([
431 "arv-put: Resuming previous upload from last checkpoint.",
432 " Use the --no-resume option to start over."])
434 writer.report_progress()
435 writer.do_queued_work() # Do work resumed from cache.
436 for path in args.paths: # Copy file data to Keep.
437 if os.path.isdir(path):
438 writer.write_directory_tree(
439 path, max_manifest_depth=args.max_manifest_depth)
441 writer.start_new_stream()
442 writer.write_file(path, args.filename or os.path.basename(path))
443 writer.finish_current_stream()
445 if args.progress: # Print newline to split stderr from stdout for humans.
449 output = writer.manifest_text()
451 output = ','.join(writer.data_locators())
454 # Register the resulting collection in Arvados.
455 collection = api_client.collections().create(
457 'owner_uuid': project_uuid,
458 'name': collection_name,
459 'manifest_text': writer.manifest_text()
461 ensure_unique_name=True
462 ).execute(num_retries=args.retries)
464 print >>stderr, "Collection saved as '%s'" % collection['name']
466 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
467 output = collection['portable_data_hash']
469 output = collection['uuid']
471 except apiclient.errors.Error as error:
473 "arv-put: Error creating Collection on project: {}.".format(
477 # Print the locator (uuid) of the new collection.
479 if not output.endswith('\n'):
482 for sigcode, orig_handler in orig_signal_handlers.items():
483 signal.signal(sigcode, orig_handler)
488 if resume_cache is not None:
489 resume_cache.destroy()
493 if __name__ == '__main__':