Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import base64
9 import datetime
10 import errno
11 import fcntl
12 import hashlib
13 import json
14 import os
15 import pwd
16 import signal
17 import socket
18 import sys
19 import tempfile
20 from apiclient import errors as apiclient_errors
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                          help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 _group = upload_opts.add_mutually_exclusive_group()
35
36 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
37                     default=-1, help="""
38 Maximum depth of directory tree to represent in the manifest
39 structure. A directory structure deeper than this will be represented
40 as a single stream in the manifest. If N=0, the manifest will contain
41 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
42 stream per filesystem directory that contains files.
43 """)
44
45 _group.add_argument('--normalize', action='store_true',
46                     help="""
47 Normalize the manifest by re-ordering files and streams after writing
48 data.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--as-stream', action='store_true', dest='stream',
54                     help="""
55 Synonym for --stream.
56 """)
57
58 _group.add_argument('--stream', action='store_true',
59                     help="""
60 Store the file content and display the resulting manifest on
61 stdout. Do not write the manifest to Keep or save a Collection object
62 in Arvados.
63 """)
64
65 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
66                     help="""
67 Synonym for --manifest.
68 """)
69
70 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--manifest', action='store_true',
76                     help="""
77 Store the file data and resulting manifest in Keep, save a Collection
78 object in Arvados, and display the manifest locator (Collection uuid)
79 on stdout. This is the default behavior.
80 """)
81
82 _group.add_argument('--as-raw', action='store_true', dest='raw',
83                     help="""
84 Synonym for --raw.
85 """)
86
87 _group.add_argument('--raw', action='store_true',
88                     help="""
89 Store the file content and display the data block locators on stdout,
90 separated by commas, with a trailing newline. Do not store a
91 manifest.
92 """)
93
94 upload_opts.add_argument('--use-filename', type=str, default=None,
95                          dest='filename', help="""
96 Synonym for --filename.
97 """)
98
99 upload_opts.add_argument('--filename', type=str, default=None,
100                          help="""
101 Use the given filename in the manifest, instead of the name of the
102 local file. This is useful when "-" or "/dev/stdin" is given as an
103 input file. It can be used only if there is exactly one path given and
104 it is not a directory. Implies --manifest.
105 """)
106
107 upload_opts.add_argument('--portable-data-hash', action='store_true',
108                          help="""
109 Print the portable data hash instead of the Arvados UUID for the collection
110 created by the upload.
111 """)
112
113 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
114                          help="""
115 Set the replication level for the new collection: how many different
116 physical storage devices (e.g., disks) should have a copy of each data
117 block. Default is to use the server-provided default (if any) or 2.
118 """)
119
120 run_opts = argparse.ArgumentParser(add_help=False)
121
122 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
123 Store the collection in the specified project, instead of your Home
124 project.
125 """)
126
127 run_opts.add_argument('--name', help="""
128 Save the collection with the specified name.
129 """)
130
131 _group = run_opts.add_mutually_exclusive_group()
132 _group.add_argument('--progress', action='store_true',
133                     help="""
134 Display human-readable progress on stderr (bytes and, if possible,
135 percentage of total data size). This is the default behavior when
136 stderr is a tty.
137 """)
138
139 _group.add_argument('--no-progress', action='store_true',
140                     help="""
141 Do not display human-readable progress on stderr, even if stderr is a
142 tty.
143 """)
144
145 _group.add_argument('--batch-progress', action='store_true',
146                     help="""
147 Display machine-readable progress on stderr (bytes and, if known,
148 total data size).
149 """)
150
151 _group = run_opts.add_mutually_exclusive_group()
152 _group.add_argument('--resume', action='store_true', default=True,
153                     help="""
154 Continue interrupted uploads from cached state (default).
155 """)
156 _group.add_argument('--no-resume', action='store_false', dest='resume',
157                     help="""
158 Do not continue interrupted uploads from cached state.
159 """)
160
161 arg_parser = argparse.ArgumentParser(
162     description='Copy data from the local filesystem to Keep.',
163     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
164
165 def parse_arguments(arguments):
166     args = arg_parser.parse_args(arguments)
167
168     if len(args.paths) == 0:
169         args.paths += ['/dev/stdin']
170
171     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
172         if args.filename:
173             arg_parser.error("""
174     --filename argument cannot be used when storing a directory or
175     multiple files.
176     """)
177
178     # Turn on --progress by default if stderr is a tty.
179     if (not (args.batch_progress or args.no_progress)
180         and os.isatty(sys.stderr.fileno())):
181         args.progress = True
182
183     if args.paths == ['-']:
184         args.paths = ['/dev/stdin']
185         if not args.filename:
186             args.filename = '-'
187
188     return args
189
190 class ResumeCacheConflict(Exception):
191     pass
192
193
194 class ResumeCache(object):
195     CACHE_DIR = '.cache/arvados/arv-put'
196
197     def __init__(self, file_spec):
198         self.cache_file = open(file_spec, 'a+')
199         self._lock_file(self.cache_file)
200         self.filename = self.cache_file.name
201
202     @classmethod
203     def make_path(cls, args):
204         md5 = hashlib.md5()
205         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
206         realpaths = sorted(os.path.realpath(path) for path in args.paths)
207         md5.update('\0'.join(realpaths))
208         if any(os.path.isdir(path) for path in realpaths):
209             md5.update(str(max(args.max_manifest_depth, -1)))
210         elif args.filename:
211             md5.update(args.filename)
212         return os.path.join(
213             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
214             md5.hexdigest())
215
216     def _lock_file(self, fileobj):
217         try:
218             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
219         except IOError:
220             raise ResumeCacheConflict("{} locked".format(fileobj.name))
221
222     def load(self):
223         self.cache_file.seek(0)
224         return json.load(self.cache_file)
225
226     def save(self, data):
227         try:
228             new_cache_fd, new_cache_name = tempfile.mkstemp(
229                 dir=os.path.dirname(self.filename))
230             self._lock_file(new_cache_fd)
231             new_cache = os.fdopen(new_cache_fd, 'r+')
232             json.dump(data, new_cache)
233             os.rename(new_cache_name, self.filename)
234         except (IOError, OSError, ResumeCacheConflict) as error:
235             try:
236                 os.unlink(new_cache_name)
237             except NameError:  # mkstemp failed.
238                 pass
239         else:
240             self.cache_file.close()
241             self.cache_file = new_cache
242
243     def close(self):
244         self.cache_file.close()
245
246     def destroy(self):
247         try:
248             os.unlink(self.filename)
249         except OSError as error:
250             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
251                 raise
252         self.close()
253
254     def restart(self):
255         self.destroy()
256         self.__init__(self.filename)
257
258
259 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
260     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
261                    ['bytes_written', '_seen_inputs'])
262
263     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
264         self.bytes_written = 0
265         self._seen_inputs = []
266         self.cache = cache
267         self.reporter = reporter
268         self.bytes_expected = bytes_expected
269         super(ArvPutCollectionWriter, self).__init__(**kwargs)
270
271     @classmethod
272     def from_cache(cls, cache, reporter=None, bytes_expected=None,
273                    num_retries=0, replication=0):
274         try:
275             state = cache.load()
276             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
277             writer = cls.from_state(state, cache, reporter, bytes_expected,
278                                     num_retries=num_retries,
279                                     replication=replication)
280         except (TypeError, ValueError,
281                 arvados.errors.StaleWriterStateError) as error:
282             return cls(cache, reporter, bytes_expected, num_retries=num_retries)
283         else:
284             return writer
285
286     def cache_state(self):
287         if self.cache is None:
288             return
289         state = self.dump_state()
290         # Transform attributes for serialization.
291         for attr, value in state.items():
292             if attr == '_data_buffer':
293                 state[attr] = base64.encodestring(''.join(value))
294             elif hasattr(value, 'popleft'):
295                 state[attr] = list(value)
296         self.cache.save(state)
297
298     def report_progress(self):
299         if self.reporter is not None:
300             self.reporter(self.bytes_written, self.bytes_expected)
301
302     def flush_data(self):
303         start_buffer_len = self._data_buffer_len
304         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
305         super(ArvPutCollectionWriter, self).flush_data()
306         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
307             self.bytes_written += (start_buffer_len - self._data_buffer_len)
308             self.report_progress()
309             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
310                 self.cache_state()
311
312     def _record_new_input(self, input_type, source_name, dest_name):
313         # The key needs to be a list because that's what we'll get back
314         # from JSON deserialization.
315         key = [input_type, source_name, dest_name]
316         if key in self._seen_inputs:
317             return False
318         self._seen_inputs.append(key)
319         return True
320
321     def write_file(self, source, filename=None):
322         if self._record_new_input('file', source, filename):
323             super(ArvPutCollectionWriter, self).write_file(source, filename)
324
325     def write_directory_tree(self,
326                              path, stream_name='.', max_manifest_depth=-1):
327         if self._record_new_input('directory', path, stream_name):
328             super(ArvPutCollectionWriter, self).write_directory_tree(
329                 path, stream_name, max_manifest_depth)
330
331
332 def expected_bytes_for(pathlist):
333     # Walk the given directory trees and stat files, adding up file sizes,
334     # so we can display progress as percent
335     bytesum = 0
336     for path in pathlist:
337         if os.path.isdir(path):
338             for filename in arvados.util.listdir_recursive(path):
339                 bytesum += os.path.getsize(os.path.join(path, filename))
340         elif not os.path.isfile(path):
341             return None
342         else:
343             bytesum += os.path.getsize(path)
344     return bytesum
345
346 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
347                                                             os.getpid())
348 def machine_progress(bytes_written, bytes_expected):
349     return _machine_format.format(
350         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
351
352 def human_progress(bytes_written, bytes_expected):
353     if bytes_expected:
354         return "\r{}M / {}M {:.1%} ".format(
355             bytes_written >> 20, bytes_expected >> 20,
356             float(bytes_written) / bytes_expected)
357     else:
358         return "\r{} ".format(bytes_written)
359
360 def progress_writer(progress_func, outfile=sys.stderr):
361     def write_progress(bytes_written, bytes_expected):
362         outfile.write(progress_func(bytes_written, bytes_expected))
363     return write_progress
364
365 def exit_signal_handler(sigcode, frame):
366     sys.exit(-sigcode)
367
368 def desired_project_uuid(api_client, project_uuid, num_retries):
369     if not project_uuid:
370         query = api_client.users().current()
371     elif arvados.util.user_uuid_pattern.match(project_uuid):
372         query = api_client.users().get(uuid=project_uuid)
373     elif arvados.util.group_uuid_pattern.match(project_uuid):
374         query = api_client.groups().get(uuid=project_uuid)
375     else:
376         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
377     return query.execute(num_retries=num_retries)['uuid']
378
379 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
380     global api_client
381
382     args = parse_arguments(arguments)
383     status = 0
384     if api_client is None:
385         api_client = arvados.api('v1')
386
387     # Determine the name to use
388     if args.name:
389         if args.stream or args.raw:
390             print >>stderr, "Cannot use --name with --stream or --raw"
391             sys.exit(1)
392         collection_name = args.name
393     else:
394         collection_name = "Saved at {} by {}@{}".format(
395             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
396             pwd.getpwuid(os.getuid()).pw_name,
397             socket.gethostname())
398
399     if args.project_uuid and (args.stream or args.raw):
400         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
401         sys.exit(1)
402
403     # Determine the parent project
404     try:
405         project_uuid = desired_project_uuid(api_client, args.project_uuid,
406                                             args.retries)
407     except (apiclient_errors.Error, ValueError) as error:
408         print >>stderr, error
409         sys.exit(1)
410
411     # write_copies diverges from args.replication here.
412     # args.replication is how many copies we will instruct Arvados to
413     # maintain (by passing it in collections().create()) after all
414     # data is written -- and if None was given, we'll use None there.
415     # Meanwhile, write_copies is how many copies of each data block we
416     # write to Keep, which has to be a number.
417     #
418     # If we simply changed args.replication from None to a default
419     # here, we'd end up erroneously passing the default replication
420     # level (instead of None) to collections().create().
421     write_copies = (args.replication or
422                     api_client._rootDesc.get('defaultCollectionReplication', 2))
423
424     if args.progress:
425         reporter = progress_writer(human_progress)
426     elif args.batch_progress:
427         reporter = progress_writer(machine_progress)
428     else:
429         reporter = None
430     bytes_expected = expected_bytes_for(args.paths)
431
432     resume_cache = None
433     if args.resume:
434         try:
435             resume_cache = ResumeCache(ResumeCache.make_path(args))
436         except (IOError, OSError, ValueError):
437             pass  # Couldn't open cache directory/file.  Continue without it.
438         except ResumeCacheConflict:
439             print >>stderr, "\n".join([
440                 "arv-put: Another process is already uploading this data.",
441                 "         Use --no-resume if this is really what you want."])
442             sys.exit(1)
443
444     if resume_cache is None:
445         writer = ArvPutCollectionWriter(
446             resume_cache, reporter, bytes_expected,
447             num_retries=args.retries,
448             replication=write_copies)
449     else:
450         writer = ArvPutCollectionWriter.from_cache(
451             resume_cache, reporter, bytes_expected,
452             num_retries=args.retries,
453             replication=write_copies)
454
455     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
456     # the originals.
457     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
458                             for sigcode in CAUGHT_SIGNALS}
459
460     if writer.bytes_written > 0:  # We're resuming a previous upload.
461         print >>stderr, "\n".join([
462                 "arv-put: Resuming previous upload from last checkpoint.",
463                 "         Use the --no-resume option to start over."])
464
465     writer.report_progress()
466     writer.do_queued_work()  # Do work resumed from cache.
467     for path in args.paths:  # Copy file data to Keep.
468         if os.path.isdir(path):
469             writer.write_directory_tree(
470                 path, max_manifest_depth=args.max_manifest_depth)
471         else:
472             writer.start_new_stream()
473             writer.write_file(path, args.filename or os.path.basename(path))
474     writer.finish_current_stream()
475
476     if args.progress:  # Print newline to split stderr from stdout for humans.
477         print >>stderr
478
479     if args.stream:
480         output = writer.manifest_text()
481         if args.normalize:
482             output = CollectionReader(output).manifest_text(normalize=True)
483     elif args.raw:
484         output = ','.join(writer.data_locators())
485     else:
486         try:
487             manifest_text = writer.manifest_text()
488             if args.normalize:
489                 manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
490             replication_attr = 'replication_desired'
491             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
492                 # API called it 'redundancy' before #3410.
493                 replication_attr = 'redundancy'
494             # Register the resulting collection in Arvados.
495             collection = api_client.collections().create(
496                 body={
497                     'owner_uuid': project_uuid,
498                     'name': collection_name,
499                     'manifest_text': manifest_text,
500                     replication_attr: args.replication,
501                     },
502                 ensure_unique_name=True
503                 ).execute(num_retries=args.retries)
504
505             print >>stderr, "Collection saved as '%s'" % collection['name']
506
507             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
508                 output = collection['portable_data_hash']
509             else:
510                 output = collection['uuid']
511
512         except apiclient_errors.Error as error:
513             print >>stderr, (
514                 "arv-put: Error creating Collection on project: {}.".format(
515                     error))
516             status = 1
517
518     # Print the locator (uuid) of the new collection.
519     stdout.write(output)
520     if not output.endswith('\n'):
521         stdout.write('\n')
522
523     for sigcode, orig_handler in orig_signal_handlers.items():
524         signal.signal(sigcode, orig_handler)
525
526     if status != 0:
527         sys.exit(status)
528
529     if resume_cache is not None:
530         resume_cache.destroy()
531
532     return output
533
534 if __name__ == '__main__':
535     main()