4389: Merge branch 'master' into 4389-breadcrumbs-infinite-loop
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import base64
9 import datetime
10 import errno
11 import fcntl
12 import hashlib
13 import json
14 import os
15 import pwd
16 import signal
17 import socket
18 import sys
19 import tempfile
20 from apiclient import errors as apiclient_errors
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                          help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 _group = upload_opts.add_mutually_exclusive_group()
35
36 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
37                     default=-1, help="""
38 Maximum depth of directory tree to represent in the manifest
39 structure. A directory structure deeper than this will be represented
40 as a single stream in the manifest. If N=0, the manifest will contain
41 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
42 stream per filesystem directory that contains files.
43 """)
44
45 _group.add_argument('--normalize', action='store_true',
46                     help="""
47 Normalize the manifest by re-ordering files and streams after writing
48 data.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--as-stream', action='store_true', dest='stream',
54                     help="""
55 Synonym for --stream.
56 """)
57
58 _group.add_argument('--stream', action='store_true',
59                     help="""
60 Store the file content and display the resulting manifest on
61 stdout. Do not write the manifest to Keep or save a Collection object
62 in Arvados.
63 """)
64
65 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
66                     help="""
67 Synonym for --manifest.
68 """)
69
70 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--manifest', action='store_true',
76                     help="""
77 Store the file data and resulting manifest in Keep, save a Collection
78 object in Arvados, and display the manifest locator (Collection uuid)
79 on stdout. This is the default behavior.
80 """)
81
82 _group.add_argument('--as-raw', action='store_true', dest='raw',
83                     help="""
84 Synonym for --raw.
85 """)
86
87 _group.add_argument('--raw', action='store_true',
88                     help="""
89 Store the file content and display the data block locators on stdout,
90 separated by commas, with a trailing newline. Do not store a
91 manifest.
92 """)
93
94 upload_opts.add_argument('--use-filename', type=str, default=None,
95                          dest='filename', help="""
96 Synonym for --filename.
97 """)
98
99 upload_opts.add_argument('--filename', type=str, default=None,
100                          help="""
101 Use the given filename in the manifest, instead of the name of the
102 local file. This is useful when "-" or "/dev/stdin" is given as an
103 input file. It can be used only if there is exactly one path given and
104 it is not a directory. Implies --manifest.
105 """)
106
107 upload_opts.add_argument('--portable-data-hash', action='store_true',
108                          help="""
109 Print the portable data hash instead of the Arvados UUID for the collection
110 created by the upload.
111 """)
112
113 run_opts = argparse.ArgumentParser(add_help=False)
114
115 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
116 Store the collection in the specified project, instead of your Home
117 project.
118 """)
119
120 run_opts.add_argument('--name', help="""
121 Save the collection with the specified name.
122 """)
123
124 _group = run_opts.add_mutually_exclusive_group()
125 _group.add_argument('--progress', action='store_true',
126                     help="""
127 Display human-readable progress on stderr (bytes and, if possible,
128 percentage of total data size). This is the default behavior when
129 stderr is a tty.
130 """)
131
132 _group.add_argument('--no-progress', action='store_true',
133                     help="""
134 Do not display human-readable progress on stderr, even if stderr is a
135 tty.
136 """)
137
138 _group.add_argument('--batch-progress', action='store_true',
139                     help="""
140 Display machine-readable progress on stderr (bytes and, if known,
141 total data size).
142 """)
143
144 _group = run_opts.add_mutually_exclusive_group()
145 _group.add_argument('--resume', action='store_true', default=True,
146                     help="""
147 Continue interrupted uploads from cached state (default).
148 """)
149 _group.add_argument('--no-resume', action='store_false', dest='resume',
150                     help="""
151 Do not continue interrupted uploads from cached state.
152 """)
153
154 arg_parser = argparse.ArgumentParser(
155     description='Copy data from the local filesystem to Keep.',
156     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
157
158 def parse_arguments(arguments):
159     args = arg_parser.parse_args(arguments)
160
161     if len(args.paths) == 0:
162         args.paths += ['/dev/stdin']
163
164     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
165         if args.filename:
166             arg_parser.error("""
167     --filename argument cannot be used when storing a directory or
168     multiple files.
169     """)
170
171     # Turn on --progress by default if stderr is a tty.
172     if (not (args.batch_progress or args.no_progress)
173         and os.isatty(sys.stderr.fileno())):
174         args.progress = True
175
176     if args.paths == ['-']:
177         args.paths = ['/dev/stdin']
178         if not args.filename:
179             args.filename = '-'
180
181     return args
182
183 class ResumeCacheConflict(Exception):
184     pass
185
186
187 class ResumeCache(object):
188     CACHE_DIR = '.cache/arvados/arv-put'
189
190     def __init__(self, file_spec):
191         self.cache_file = open(file_spec, 'a+')
192         self._lock_file(self.cache_file)
193         self.filename = self.cache_file.name
194
195     @classmethod
196     def make_path(cls, args):
197         md5 = hashlib.md5()
198         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
199         realpaths = sorted(os.path.realpath(path) for path in args.paths)
200         md5.update('\0'.join(realpaths))
201         if any(os.path.isdir(path) for path in realpaths):
202             md5.update(str(max(args.max_manifest_depth, -1)))
203         elif args.filename:
204             md5.update(args.filename)
205         return os.path.join(
206             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
207             md5.hexdigest())
208
209     def _lock_file(self, fileobj):
210         try:
211             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
212         except IOError:
213             raise ResumeCacheConflict("{} locked".format(fileobj.name))
214
215     def load(self):
216         self.cache_file.seek(0)
217         return json.load(self.cache_file)
218
219     def save(self, data):
220         try:
221             new_cache_fd, new_cache_name = tempfile.mkstemp(
222                 dir=os.path.dirname(self.filename))
223             self._lock_file(new_cache_fd)
224             new_cache = os.fdopen(new_cache_fd, 'r+')
225             json.dump(data, new_cache)
226             os.rename(new_cache_name, self.filename)
227         except (IOError, OSError, ResumeCacheConflict) as error:
228             try:
229                 os.unlink(new_cache_name)
230             except NameError:  # mkstemp failed.
231                 pass
232         else:
233             self.cache_file.close()
234             self.cache_file = new_cache
235
236     def close(self):
237         self.cache_file.close()
238
239     def destroy(self):
240         try:
241             os.unlink(self.filename)
242         except OSError as error:
243             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
244                 raise
245         self.close()
246
247     def restart(self):
248         self.destroy()
249         self.__init__(self.filename)
250
251
252 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
253     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
254                    ['bytes_written', '_seen_inputs'])
255
256     def __init__(self, cache=None, reporter=None, bytes_expected=None,
257                  api_client=None, num_retries=0):
258         self.bytes_written = 0
259         self._seen_inputs = []
260         self.cache = cache
261         self.reporter = reporter
262         self.bytes_expected = bytes_expected
263         super(ArvPutCollectionWriter, self).__init__(
264             api_client, num_retries=num_retries)
265
266     @classmethod
267     def from_cache(cls, cache, reporter=None, bytes_expected=None,
268                    num_retries=0):
269         try:
270             state = cache.load()
271             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
272             writer = cls.from_state(state, cache, reporter, bytes_expected,
273                                     num_retries=num_retries)
274         except (TypeError, ValueError,
275                 arvados.errors.StaleWriterStateError) as error:
276             return cls(cache, reporter, bytes_expected, num_retries=num_retries)
277         else:
278             return writer
279
280     def cache_state(self):
281         if self.cache is None:
282             return
283         state = self.dump_state()
284         # Transform attributes for serialization.
285         for attr, value in state.items():
286             if attr == '_data_buffer':
287                 state[attr] = base64.encodestring(''.join(value))
288             elif hasattr(value, 'popleft'):
289                 state[attr] = list(value)
290         self.cache.save(state)
291
292     def report_progress(self):
293         if self.reporter is not None:
294             self.reporter(self.bytes_written, self.bytes_expected)
295
296     def flush_data(self):
297         start_buffer_len = self._data_buffer_len
298         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
299         super(ArvPutCollectionWriter, self).flush_data()
300         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
301             self.bytes_written += (start_buffer_len - self._data_buffer_len)
302             self.report_progress()
303             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
304                 self.cache_state()
305
306     def _record_new_input(self, input_type, source_name, dest_name):
307         # The key needs to be a list because that's what we'll get back
308         # from JSON deserialization.
309         key = [input_type, source_name, dest_name]
310         if key in self._seen_inputs:
311             return False
312         self._seen_inputs.append(key)
313         return True
314
315     def write_file(self, source, filename=None):
316         if self._record_new_input('file', source, filename):
317             super(ArvPutCollectionWriter, self).write_file(source, filename)
318
319     def write_directory_tree(self,
320                              path, stream_name='.', max_manifest_depth=-1):
321         if self._record_new_input('directory', path, stream_name):
322             super(ArvPutCollectionWriter, self).write_directory_tree(
323                 path, stream_name, max_manifest_depth)
324
325
326 def expected_bytes_for(pathlist):
327     # Walk the given directory trees and stat files, adding up file sizes,
328     # so we can display progress as percent
329     bytesum = 0
330     for path in pathlist:
331         if os.path.isdir(path):
332             for filename in arvados.util.listdir_recursive(path):
333                 bytesum += os.path.getsize(os.path.join(path, filename))
334         elif not os.path.isfile(path):
335             return None
336         else:
337             bytesum += os.path.getsize(path)
338     return bytesum
339
340 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
341                                                             os.getpid())
342 def machine_progress(bytes_written, bytes_expected):
343     return _machine_format.format(
344         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
345
346 def human_progress(bytes_written, bytes_expected):
347     if bytes_expected:
348         return "\r{}M / {}M {:.1%} ".format(
349             bytes_written >> 20, bytes_expected >> 20,
350             float(bytes_written) / bytes_expected)
351     else:
352         return "\r{} ".format(bytes_written)
353
354 def progress_writer(progress_func, outfile=sys.stderr):
355     def write_progress(bytes_written, bytes_expected):
356         outfile.write(progress_func(bytes_written, bytes_expected))
357     return write_progress
358
359 def exit_signal_handler(sigcode, frame):
360     sys.exit(-sigcode)
361
362 def desired_project_uuid(api_client, project_uuid, num_retries):
363     if not project_uuid:
364         query = api_client.users().current()
365     elif arvados.util.user_uuid_pattern.match(project_uuid):
366         query = api_client.users().get(uuid=project_uuid)
367     elif arvados.util.group_uuid_pattern.match(project_uuid):
368         query = api_client.groups().get(uuid=project_uuid)
369     else:
370         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
371     return query.execute(num_retries=num_retries)['uuid']
372
373 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
374     global api_client
375
376     args = parse_arguments(arguments)
377     status = 0
378     if api_client is None:
379         api_client = arvados.api('v1')
380
381     # Determine the name to use
382     if args.name:
383         if args.stream or args.raw:
384             print >>stderr, "Cannot use --name with --stream or --raw"
385             sys.exit(1)
386         collection_name = args.name
387     else:
388         collection_name = "Saved at {} by {}@{}".format(
389             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
390             pwd.getpwuid(os.getuid()).pw_name,
391             socket.gethostname())
392
393     if args.project_uuid and (args.stream or args.raw):
394         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
395         sys.exit(1)
396
397     # Determine the parent project
398     try:
399         project_uuid = desired_project_uuid(api_client, args.project_uuid,
400                                             args.retries)
401     except (apiclient_errors.Error, ValueError) as error:
402         print >>stderr, error
403         sys.exit(1)
404
405     if args.progress:
406         reporter = progress_writer(human_progress)
407     elif args.batch_progress:
408         reporter = progress_writer(machine_progress)
409     else:
410         reporter = None
411     bytes_expected = expected_bytes_for(args.paths)
412
413     resume_cache = None
414     if args.resume:
415         try:
416             resume_cache = ResumeCache(ResumeCache.make_path(args))
417         except (IOError, OSError, ValueError):
418             pass  # Couldn't open cache directory/file.  Continue without it.
419         except ResumeCacheConflict:
420             print >>stderr, "\n".join([
421                 "arv-put: Another process is already uploading this data.",
422                 "         Use --no-resume if this is really what you want."])
423             sys.exit(1)
424
425     if resume_cache is None:
426         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
427                                         num_retries=args.retries)
428     else:
429         writer = ArvPutCollectionWriter.from_cache(
430             resume_cache, reporter, bytes_expected, num_retries=args.retries)
431
432     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
433     # the originals.
434     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
435                             for sigcode in CAUGHT_SIGNALS}
436
437     if writer.bytes_written > 0:  # We're resuming a previous upload.
438         print >>stderr, "\n".join([
439                 "arv-put: Resuming previous upload from last checkpoint.",
440                 "         Use the --no-resume option to start over."])
441
442     writer.report_progress()
443     writer.do_queued_work()  # Do work resumed from cache.
444     for path in args.paths:  # Copy file data to Keep.
445         if os.path.isdir(path):
446             writer.write_directory_tree(
447                 path, max_manifest_depth=args.max_manifest_depth)
448         else:
449             writer.start_new_stream()
450             writer.write_file(path, args.filename or os.path.basename(path))
451     writer.finish_current_stream()
452
453     if args.progress:  # Print newline to split stderr from stdout for humans.
454         print >>stderr
455
456     if args.stream:
457         output = writer.manifest_text()
458         if args.normalize:
459             output = CollectionReader(output).manifest_text(normalize=True)
460     elif args.raw:
461         output = ','.join(writer.data_locators())
462     else:
463         try:
464             manifest_text = writer.manifest_text()
465             if args.normalize:
466                 manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
467             # Register the resulting collection in Arvados.
468             collection = api_client.collections().create(
469                 body={
470                     'owner_uuid': project_uuid,
471                     'name': collection_name,
472                     'manifest_text': manifest_text
473                     },
474                 ensure_unique_name=True
475                 ).execute(num_retries=args.retries)
476
477             print >>stderr, "Collection saved as '%s'" % collection['name']
478
479             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
480                 output = collection['portable_data_hash']
481             else:
482                 output = collection['uuid']
483
484         except apiclient_errors.Error as error:
485             print >>stderr, (
486                 "arv-put: Error creating Collection on project: {}.".format(
487                     error))
488             status = 1
489
490     # Print the locator (uuid) of the new collection.
491     stdout.write(output)
492     if not output.endswith('\n'):
493         stdout.write('\n')
494
495     for sigcode, orig_handler in orig_signal_handlers.items():
496         signal.signal(sigcode, orig_handler)
497
498     if status != 0:
499         sys.exit(status)
500
501     if resume_cache is not None:
502         resume_cache.destroy()
503
504     return output
505
506 if __name__ == '__main__':
507     main()