4 # --md5sum - display md5 of each file as read from disk
20 from apiclient import errors as apiclient_errors
22 import arvados.commands._util as arv_cmd
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
27 upload_opts = argparse.ArgumentParser(add_help=False)
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31 Local file or directory. Default: read from standard input.
34 _group = upload_opts.add_mutually_exclusive_group()
36 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38 Maximum depth of directory tree to represent in the manifest
39 structure. A directory structure deeper than this will be represented
40 as a single stream in the manifest. If N=0, the manifest will contain
41 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
42 stream per filesystem directory that contains files.
45 _group.add_argument('--normalize', action='store_true',
47 Normalize the manifest by re-ordering files and streams after writing
51 _group = upload_opts.add_mutually_exclusive_group()
53 _group.add_argument('--as-stream', action='store_true', dest='stream',
58 _group.add_argument('--stream', action='store_true',
60 Store the file content and display the resulting manifest on
61 stdout. Do not write the manifest to Keep or save a Collection object
65 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67 Synonym for --manifest.
70 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72 Synonym for --manifest.
75 _group.add_argument('--manifest', action='store_true',
77 Store the file data and resulting manifest in Keep, save a Collection
78 object in Arvados, and display the manifest locator (Collection uuid)
79 on stdout. This is the default behavior.
82 _group.add_argument('--as-raw', action='store_true', dest='raw',
87 _group.add_argument('--raw', action='store_true',
89 Store the file content and display the data block locators on stdout,
90 separated by commas, with a trailing newline. Do not store a
94 upload_opts.add_argument('--use-filename', type=str, default=None,
95 dest='filename', help="""
96 Synonym for --filename.
99 upload_opts.add_argument('--filename', type=str, default=None,
101 Use the given filename in the manifest, instead of the name of the
102 local file. This is useful when "-" or "/dev/stdin" is given as an
103 input file. It can be used only if there is exactly one path given and
104 it is not a directory. Implies --manifest.
107 upload_opts.add_argument('--portable-data-hash', action='store_true',
109 Print the portable data hash instead of the Arvados UUID for the collection
110 created by the upload.
113 run_opts = argparse.ArgumentParser(add_help=False)
115 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
116 Store the collection in the specified project, instead of your Home
120 run_opts.add_argument('--name', help="""
121 Save the collection with the specified name.
124 _group = run_opts.add_mutually_exclusive_group()
125 _group.add_argument('--progress', action='store_true',
127 Display human-readable progress on stderr (bytes and, if possible,
128 percentage of total data size). This is the default behavior when
132 _group.add_argument('--no-progress', action='store_true',
134 Do not display human-readable progress on stderr, even if stderr is a
138 _group.add_argument('--batch-progress', action='store_true',
140 Display machine-readable progress on stderr (bytes and, if known,
144 _group = run_opts.add_mutually_exclusive_group()
145 _group.add_argument('--resume', action='store_true', default=True,
147 Continue interrupted uploads from cached state (default).
149 _group.add_argument('--no-resume', action='store_false', dest='resume',
151 Do not continue interrupted uploads from cached state.
154 arg_parser = argparse.ArgumentParser(
155 description='Copy data from the local filesystem to Keep.',
156 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
158 def parse_arguments(arguments):
159 args = arg_parser.parse_args(arguments)
161 if len(args.paths) == 0:
162 args.paths += ['/dev/stdin']
164 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
167 --filename argument cannot be used when storing a directory or
171 # Turn on --progress by default if stderr is a tty.
172 if (not (args.batch_progress or args.no_progress)
173 and os.isatty(sys.stderr.fileno())):
176 if args.paths == ['-']:
177 args.paths = ['/dev/stdin']
178 if not args.filename:
183 class ResumeCacheConflict(Exception):
187 class ResumeCache(object):
188 CACHE_DIR = '.cache/arvados/arv-put'
190 def __init__(self, file_spec):
191 self.cache_file = open(file_spec, 'a+')
192 self._lock_file(self.cache_file)
193 self.filename = self.cache_file.name
196 def make_path(cls, args):
198 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
199 realpaths = sorted(os.path.realpath(path) for path in args.paths)
200 md5.update('\0'.join(realpaths))
201 if any(os.path.isdir(path) for path in realpaths):
202 md5.update(str(max(args.max_manifest_depth, -1)))
204 md5.update(args.filename)
206 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
209 def _lock_file(self, fileobj):
211 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
213 raise ResumeCacheConflict("{} locked".format(fileobj.name))
216 self.cache_file.seek(0)
217 return json.load(self.cache_file)
219 def save(self, data):
221 new_cache_fd, new_cache_name = tempfile.mkstemp(
222 dir=os.path.dirname(self.filename))
223 self._lock_file(new_cache_fd)
224 new_cache = os.fdopen(new_cache_fd, 'r+')
225 json.dump(data, new_cache)
226 os.rename(new_cache_name, self.filename)
227 except (IOError, OSError, ResumeCacheConflict) as error:
229 os.unlink(new_cache_name)
230 except NameError: # mkstemp failed.
233 self.cache_file.close()
234 self.cache_file = new_cache
237 self.cache_file.close()
241 os.unlink(self.filename)
242 except OSError as error:
243 if error.errno != errno.ENOENT: # That's what we wanted anyway.
249 self.__init__(self.filename)
252 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
253 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
254 ['bytes_written', '_seen_inputs'])
256 def __init__(self, cache=None, reporter=None, bytes_expected=None,
257 api_client=None, num_retries=0):
258 self.bytes_written = 0
259 self._seen_inputs = []
261 self.reporter = reporter
262 self.bytes_expected = bytes_expected
263 super(ArvPutCollectionWriter, self).__init__(
264 api_client, num_retries=num_retries)
267 def from_cache(cls, cache, reporter=None, bytes_expected=None,
271 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
272 writer = cls.from_state(state, cache, reporter, bytes_expected,
273 num_retries=num_retries)
274 except (TypeError, ValueError,
275 arvados.errors.StaleWriterStateError) as error:
276 return cls(cache, reporter, bytes_expected, num_retries=num_retries)
280 def cache_state(self):
281 if self.cache is None:
283 state = self.dump_state()
284 # Transform attributes for serialization.
285 for attr, value in state.items():
286 if attr == '_data_buffer':
287 state[attr] = base64.encodestring(''.join(value))
288 elif hasattr(value, 'popleft'):
289 state[attr] = list(value)
290 self.cache.save(state)
292 def report_progress(self):
293 if self.reporter is not None:
294 self.reporter(self.bytes_written, self.bytes_expected)
296 def flush_data(self):
297 start_buffer_len = self._data_buffer_len
298 start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
299 super(ArvPutCollectionWriter, self).flush_data()
300 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
301 self.bytes_written += (start_buffer_len - self._data_buffer_len)
302 self.report_progress()
303 if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
306 def _record_new_input(self, input_type, source_name, dest_name):
307 # The key needs to be a list because that's what we'll get back
308 # from JSON deserialization.
309 key = [input_type, source_name, dest_name]
310 if key in self._seen_inputs:
312 self._seen_inputs.append(key)
315 def write_file(self, source, filename=None):
316 if self._record_new_input('file', source, filename):
317 super(ArvPutCollectionWriter, self).write_file(source, filename)
319 def write_directory_tree(self,
320 path, stream_name='.', max_manifest_depth=-1):
321 if self._record_new_input('directory', path, stream_name):
322 super(ArvPutCollectionWriter, self).write_directory_tree(
323 path, stream_name, max_manifest_depth)
326 def expected_bytes_for(pathlist):
327 # Walk the given directory trees and stat files, adding up file sizes,
328 # so we can display progress as percent
330 for path in pathlist:
331 if os.path.isdir(path):
332 for filename in arvados.util.listdir_recursive(path):
333 bytesum += os.path.getsize(os.path.join(path, filename))
334 elif not os.path.isfile(path):
337 bytesum += os.path.getsize(path)
340 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
342 def machine_progress(bytes_written, bytes_expected):
343 return _machine_format.format(
344 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
346 def human_progress(bytes_written, bytes_expected):
348 return "\r{}M / {}M {:.1%} ".format(
349 bytes_written >> 20, bytes_expected >> 20,
350 float(bytes_written) / bytes_expected)
352 return "\r{} ".format(bytes_written)
354 def progress_writer(progress_func, outfile=sys.stderr):
355 def write_progress(bytes_written, bytes_expected):
356 outfile.write(progress_func(bytes_written, bytes_expected))
357 return write_progress
359 def exit_signal_handler(sigcode, frame):
362 def desired_project_uuid(api_client, project_uuid, num_retries):
364 query = api_client.users().current()
365 elif arvados.util.user_uuid_pattern.match(project_uuid):
366 query = api_client.users().get(uuid=project_uuid)
367 elif arvados.util.group_uuid_pattern.match(project_uuid):
368 query = api_client.groups().get(uuid=project_uuid)
370 raise ValueError("Not a valid project UUID: {}".format(project_uuid))
371 return query.execute(num_retries=num_retries)['uuid']
373 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
376 args = parse_arguments(arguments)
378 if api_client is None:
379 api_client = arvados.api('v1')
381 # Determine the name to use
383 if args.stream or args.raw:
384 print >>stderr, "Cannot use --name with --stream or --raw"
386 collection_name = args.name
388 collection_name = "Saved at {} by {}@{}".format(
389 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
390 pwd.getpwuid(os.getuid()).pw_name,
391 socket.gethostname())
393 if args.project_uuid and (args.stream or args.raw):
394 print >>stderr, "Cannot use --project-uuid with --stream or --raw"
397 # Determine the parent project
399 project_uuid = desired_project_uuid(api_client, args.project_uuid,
401 except (apiclient_errors.Error, ValueError) as error:
402 print >>stderr, error
406 reporter = progress_writer(human_progress)
407 elif args.batch_progress:
408 reporter = progress_writer(machine_progress)
411 bytes_expected = expected_bytes_for(args.paths)
416 resume_cache = ResumeCache(ResumeCache.make_path(args))
417 except (IOError, OSError, ValueError):
418 pass # Couldn't open cache directory/file. Continue without it.
419 except ResumeCacheConflict:
420 print >>stderr, "\n".join([
421 "arv-put: Another process is already uploading this data.",
422 " Use --no-resume if this is really what you want."])
425 if resume_cache is None:
426 writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
427 num_retries=args.retries)
429 writer = ArvPutCollectionWriter.from_cache(
430 resume_cache, reporter, bytes_expected, num_retries=args.retries)
432 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
434 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
435 for sigcode in CAUGHT_SIGNALS}
437 if writer.bytes_written > 0: # We're resuming a previous upload.
438 print >>stderr, "\n".join([
439 "arv-put: Resuming previous upload from last checkpoint.",
440 " Use the --no-resume option to start over."])
442 writer.report_progress()
443 writer.do_queued_work() # Do work resumed from cache.
444 for path in args.paths: # Copy file data to Keep.
445 if os.path.isdir(path):
446 writer.write_directory_tree(
447 path, max_manifest_depth=args.max_manifest_depth)
449 writer.start_new_stream()
450 writer.write_file(path, args.filename or os.path.basename(path))
451 writer.finish_current_stream()
453 if args.progress: # Print newline to split stderr from stdout for humans.
457 output = writer.manifest_text()
459 output = CollectionReader(output).manifest_text(normalize=True)
461 output = ','.join(writer.data_locators())
464 manifest_text = writer.manifest_text()
466 manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
467 # Register the resulting collection in Arvados.
468 collection = api_client.collections().create(
470 'owner_uuid': project_uuid,
471 'name': collection_name,
472 'manifest_text': manifest_text
474 ensure_unique_name=True
475 ).execute(num_retries=args.retries)
477 print >>stderr, "Collection saved as '%s'" % collection['name']
479 if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
480 output = collection['portable_data_hash']
482 output = collection['uuid']
484 except apiclient_errors.Error as error:
486 "arv-put: Error creating Collection on project: {}.".format(
490 # Print the locator (uuid) of the new collection.
492 if not output.endswith('\n'):
495 for sigcode, orig_handler in orig_signal_handlers.items():
496 signal.signal(sigcode, orig_handler)
501 if resume_cache is not None:
502 resume_cache.destroy()
506 if __name__ == '__main__':