4 # --md5sum - display md5 of each file as read from disk
6 import apiclient.errors
22 import arvados.commands._util as arv_cmd
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
26 upload_opts = argparse.ArgumentParser(add_help=False)
28 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30 Local file or directory. Default: read from standard input.
33 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
35 Maximum depth of directory tree to represent in the manifest
36 structure. A directory structure deeper than this will be represented
37 as a single stream in the manifest. If N=0, the manifest will contain
38 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
39 stream per filesystem directory that contains files.
42 upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
43 Store the collection in the specified project, instead of your Home
47 upload_opts.add_argument('--name', help="""
48 Save the collection with the specified name, rather than the default
49 generic name "Saved at {time} by {username}@{host}".
52 _group = upload_opts.add_mutually_exclusive_group()
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
59 _group.add_argument('--stream', action='store_true',
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68 Synonym for --manifest.
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73 Synonym for --manifest.
76 _group.add_argument('--manifest', action='store_true',
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
88 _group.add_argument('--raw', action='store_true',
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96 dest='filename', help="""
97 Synonym for --filename.
100 upload_opts.add_argument('--filename', type=str, default=None,
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
108 run_opts = argparse.ArgumentParser(add_help=False)
109 _group = run_opts.add_mutually_exclusive_group()
110 _group.add_argument('--progress', action='store_true',
112 Display human-readable progress on stderr (bytes and, if possible,
113 percentage of total data size). This is the default behavior when
117 _group.add_argument('--no-progress', action='store_true',
119 Do not display human-readable progress on stderr, even if stderr is a
123 _group.add_argument('--batch-progress', action='store_true',
125 Display machine-readable progress on stderr (bytes and, if known,
129 _group = run_opts.add_mutually_exclusive_group()
130 _group.add_argument('--resume', action='store_true', default=True,
132 Continue interrupted uploads from cached state (default).
134 _group.add_argument('--no-resume', action='store_false', dest='resume',
136 Do not continue interrupted uploads from cached state.
139 arg_parser = argparse.ArgumentParser(
140 description='Copy data from the local filesystem to Keep.',
141 parents=[upload_opts, run_opts])
143 def parse_arguments(arguments):
144 args = arg_parser.parse_args(arguments)
146 if len(args.paths) == 0:
147 args.paths += ['/dev/stdin']
149 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
152 --filename argument cannot be used when storing a directory or
156 # Turn on --progress by default if stderr is a tty.
157 if (not (args.batch_progress or args.no_progress)
158 and os.isatty(sys.stderr.fileno())):
161 if args.paths == ['-']:
162 args.paths = ['/dev/stdin']
163 if not args.filename:
168 class ResumeCacheConflict(Exception):
172 class ResumeCache(object):
173 CACHE_DIR = '.cache/arvados/arv-put'
175 def __init__(self, file_spec):
176 self.cache_file = open(file_spec, 'a+')
177 self._lock_file(self.cache_file)
178 self.filename = self.cache_file.name
181 def make_path(cls, args):
183 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
184 realpaths = sorted(os.path.realpath(path) for path in args.paths)
185 md5.update('\0'.join(realpaths))
186 if any(os.path.isdir(path) for path in realpaths):
187 md5.update(str(max(args.max_manifest_depth, -1)))
189 md5.update(args.filename)
191 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
194 def _lock_file(self, fileobj):
196 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
198 raise ResumeCacheConflict("{} locked".format(fileobj.name))
201 self.cache_file.seek(0)
202 return json.load(self.cache_file)
204 def save(self, data):
206 new_cache_fd, new_cache_name = tempfile.mkstemp(
207 dir=os.path.dirname(self.filename))
208 self._lock_file(new_cache_fd)
209 new_cache = os.fdopen(new_cache_fd, 'r+')
210 json.dump(data, new_cache)
211 os.rename(new_cache_name, self.filename)
212 except (IOError, OSError, ResumeCacheConflict) as error:
214 os.unlink(new_cache_name)
215 except NameError: # mkstemp failed.
218 self.cache_file.close()
219 self.cache_file = new_cache
222 self.cache_file.close()
226 os.unlink(self.filename)
227 except OSError as error:
228 if error.errno != errno.ENOENT: # That's what we wanted anyway.
234 self.__init__(self.filename)
237 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
238 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
239 ['bytes_written', '_seen_inputs'])
241 def __init__(self, cache=None, reporter=None, bytes_expected=None):
242 self.bytes_written = 0
243 self._seen_inputs = []
245 self.reporter = reporter
246 self.bytes_expected = bytes_expected
247 super(ArvPutCollectionWriter, self).__init__()
250 def from_cache(cls, cache, reporter=None, bytes_expected=None):
253 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
254 writer = cls.from_state(state, cache, reporter, bytes_expected)
255 except (TypeError, ValueError,
256 arvados.errors.StaleWriterStateError) as error:
257 return cls(cache, reporter, bytes_expected)
261 def cache_state(self):
262 if self.cache is None:
264 state = self.dump_state()
265 # Transform attributes for serialization.
266 for attr, value in state.items():
267 if attr == '_data_buffer':
268 state[attr] = base64.encodestring(''.join(value))
269 elif hasattr(value, 'popleft'):
270 state[attr] = list(value)
271 self.cache.save(state)
273 def report_progress(self):
274 if self.reporter is not None:
275 self.reporter(self.bytes_written, self.bytes_expected)
277 def flush_data(self):
278 start_buffer_len = self._data_buffer_len
279 start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
280 super(ArvPutCollectionWriter, self).flush_data()
281 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
282 self.bytes_written += (start_buffer_len - self._data_buffer_len)
283 self.report_progress()
284 if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
287 def _record_new_input(self, input_type, source_name, dest_name):
288 # The key needs to be a list because that's what we'll get back
289 # from JSON deserialization.
290 key = [input_type, source_name, dest_name]
291 if key in self._seen_inputs:
293 self._seen_inputs.append(key)
296 def write_file(self, source, filename=None):
297 if self._record_new_input('file', source, filename):
298 super(ArvPutCollectionWriter, self).write_file(source, filename)
300 def write_directory_tree(self,
301 path, stream_name='.', max_manifest_depth=-1):
302 if self._record_new_input('directory', path, stream_name):
303 super(ArvPutCollectionWriter, self).write_directory_tree(
304 path, stream_name, max_manifest_depth)
307 def expected_bytes_for(pathlist):
308 # Walk the given directory trees and stat files, adding up file sizes,
309 # so we can display progress as percent
311 for path in pathlist:
312 if os.path.isdir(path):
313 for filename in arvados.util.listdir_recursive(path):
314 bytesum += os.path.getsize(os.path.join(path, filename))
315 elif not os.path.isfile(path):
318 bytesum += os.path.getsize(path)
321 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
323 def machine_progress(bytes_written, bytes_expected):
324 return _machine_format.format(
325 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
327 def human_progress(bytes_written, bytes_expected):
329 return "\r{}M / {}M {:.1%} ".format(
330 bytes_written >> 20, bytes_expected >> 20,
331 float(bytes_written) / bytes_expected)
333 return "\r{} ".format(bytes_written)
335 def progress_writer(progress_func, outfile=sys.stderr):
336 def write_progress(bytes_written, bytes_expected):
337 outfile.write(progress_func(bytes_written, bytes_expected))
338 return write_progress
340 def exit_signal_handler(sigcode, frame):
343 def check_project_exists(project_uuid):
345 arvados.api('v1').groups().get(uuid=project_uuid).execute()
346 except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
347 raise ValueError("Project {} not found ({})".format(project_uuid,
352 def prep_project_link(args, stderr, project_exists=check_project_exists):
353 # Given the user's command line arguments, return a dictionary with data
354 # to create the desired project link for this Collection, or None.
355 # Raises ValueError if the arguments request something impossible.
356 making_collection = not (args.raw or args.stream)
357 if not making_collection:
358 if args.name or args.project_uuid:
359 raise ValueError("Requested a Link without creating a Collection")
361 link = {'tail_uuid': args.project_uuid,
362 'link_class': 'name',
364 if not link['tail_uuid']:
365 link['tail_uuid'] = arvados.api('v1').users().current().execute()['uuid']
366 elif not project_exists(link['tail_uuid']):
367 raise ValueError("Project {} not found".format(args.project_uuid))
369 link['name'] = "Saved at {} by {}@{}".format(
370 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
371 pwd.getpwuid(os.getuid()).pw_name,
372 socket.gethostname())
374 "arv-put: No --name specified. Saving as \"%s\"\n" % link['name'])
375 link['owner_uuid'] = link['tail_uuid']
378 def create_project_link(locator, link):
379 link['head_uuid'] = locator
380 return arvados.api('v1').links().create(body=link).execute()
382 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
385 args = parse_arguments(arguments)
387 project_link = prep_project_link(args, stderr)
388 except ValueError as error:
389 print >>stderr, "arv-put: {}.".format(error)
393 reporter = progress_writer(human_progress)
394 elif args.batch_progress:
395 reporter = progress_writer(machine_progress)
398 bytes_expected = expected_bytes_for(args.paths)
403 resume_cache = ResumeCache(ResumeCache.make_path(args))
404 except (IOError, OSError, ValueError):
405 pass # Couldn't open cache directory/file. Continue without it.
406 except ResumeCacheConflict:
407 print >>stderr, "\n".join([
408 "arv-put: Another process is already uploading this data.",
409 " Use --no-resume if this is really what you want."])
412 if resume_cache is None:
413 writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
415 writer = ArvPutCollectionWriter.from_cache(
416 resume_cache, reporter, bytes_expected)
418 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
420 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
421 for sigcode in CAUGHT_SIGNALS}
423 if writer.bytes_written > 0: # We're resuming a previous upload.
424 print >>stderr, "\n".join([
425 "arv-put: Resuming previous upload from last checkpoint.",
426 " Use the --no-resume option to start over."])
428 writer.report_progress()
429 writer.do_queued_work() # Do work resumed from cache.
430 for path in args.paths: # Copy file data to Keep.
431 if os.path.isdir(path):
432 writer.write_directory_tree(
433 path, max_manifest_depth=args.max_manifest_depth)
435 writer.start_new_stream()
436 writer.write_file(path, args.filename or os.path.basename(path))
437 writer.finish_current_stream()
439 if args.progress: # Print newline to split stderr from stdout for humans.
443 output = writer.manifest_text()
445 output = ','.join(writer.data_locators())
447 # Register the resulting collection in Arvados.
448 collection = arvados.api().collections().create(
450 'uuid': writer.finish(),
451 'manifest_text': writer.manifest_text(),
455 # Print the locator (uuid) of the new collection.
456 output = collection['uuid']
457 if project_link is not None:
459 create_project_link(output, project_link)
460 except apiclient.errors.Error as error:
462 "arv-put: Error adding Collection to project: {}.".format(
467 if not output.endswith('\n'):
470 for sigcode, orig_handler in orig_signal_handlers.items():
471 signal.signal(sigcode, orig_handler)
476 if resume_cache is not None:
477 resume_cache.destroy()
481 if __name__ == '__main__':