Merge branch '3351-keep-timeout' closes #3351
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25
26 upload_opts = argparse.ArgumentParser(add_help=False)
27
28 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
29                     help="""
30 Local file or directory. Default: read from standard input.
31 """)
32
33 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
34                     default=-1, help="""
35 Maximum depth of directory tree to represent in the manifest
36 structure. A directory structure deeper than this will be represented
37 as a single stream in the manifest. If N=0, the manifest will contain
38 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
39 stream per filesystem directory that contains files.
40 """)
41
42 upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
43 When a Collection is made, make a Link to save it under the specified project.
44 """)
45
46 upload_opts.add_argument('--name', help="""
47 When a Collection is linked to a project, use the specified name.
48 """)
49
50 _group = upload_opts.add_mutually_exclusive_group()
51
52 _group.add_argument('--as-stream', action='store_true', dest='stream',
53                    help="""
54 Synonym for --stream.
55 """)
56
57 _group.add_argument('--stream', action='store_true',
58                    help="""
59 Store the file content and display the resulting manifest on
60 stdout. Do not write the manifest to Keep or save a Collection object
61 in Arvados.
62 """)
63
64 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
65                    help="""
66 Synonym for --manifest.
67 """)
68
69 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
70                    help="""
71 Synonym for --manifest.
72 """)
73
74 _group.add_argument('--manifest', action='store_true',
75                    help="""
76 Store the file data and resulting manifest in Keep, save a Collection
77 object in Arvados, and display the manifest locator (Collection uuid)
78 on stdout. This is the default behavior.
79 """)
80
81 _group.add_argument('--as-raw', action='store_true', dest='raw',
82                    help="""
83 Synonym for --raw.
84 """)
85
86 _group.add_argument('--raw', action='store_true',
87                    help="""
88 Store the file content and display the data block locators on stdout,
89 separated by commas, with a trailing newline. Do not store a
90 manifest.
91 """)
92
93 upload_opts.add_argument('--use-filename', type=str, default=None,
94                     dest='filename', help="""
95 Synonym for --filename.
96 """)
97
98 upload_opts.add_argument('--filename', type=str, default=None,
99                     help="""
100 Use the given filename in the manifest, instead of the name of the
101 local file. This is useful when "-" or "/dev/stdin" is given as an
102 input file. It can be used only if there is exactly one path given and
103 it is not a directory. Implies --manifest.
104 """)
105
106 run_opts = argparse.ArgumentParser(add_help=False)
107 _group = run_opts.add_mutually_exclusive_group()
108 _group.add_argument('--progress', action='store_true',
109                    help="""
110 Display human-readable progress on stderr (bytes and, if possible,
111 percentage of total data size). This is the default behavior when
112 stderr is a tty.
113 """)
114
115 _group.add_argument('--no-progress', action='store_true',
116                    help="""
117 Do not display human-readable progress on stderr, even if stderr is a
118 tty.
119 """)
120
121 _group.add_argument('--batch-progress', action='store_true',
122                    help="""
123 Display machine-readable progress on stderr (bytes and, if known,
124 total data size).
125 """)
126
127 _group = run_opts.add_mutually_exclusive_group()
128 _group.add_argument('--resume', action='store_true', default=True,
129                    help="""
130 Continue interrupted uploads from cached state (default).
131 """)
132 _group.add_argument('--no-resume', action='store_false', dest='resume',
133                    help="""
134 Do not continue interrupted uploads from cached state.
135 """)
136
137 arg_parser = argparse.ArgumentParser(
138     description='Copy data from the local filesystem to Keep.',
139     parents=[upload_opts, run_opts])
140
141 def parse_arguments(arguments):
142     args = arg_parser.parse_args(arguments)
143
144     if len(args.paths) == 0:
145         args.paths += ['/dev/stdin']
146
147     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
148         if args.filename:
149             arg_parser.error("""
150     --filename argument cannot be used when storing a directory or
151     multiple files.
152     """)
153
154     # Turn on --progress by default if stderr is a tty.
155     if (not (args.batch_progress or args.no_progress)
156         and os.isatty(sys.stderr.fileno())):
157         args.progress = True
158
159     if args.paths == ['-']:
160         args.paths = ['/dev/stdin']
161         if not args.filename:
162             args.filename = '-'
163
164     return args
165
166 class ResumeCacheConflict(Exception):
167     pass
168
169
170 class ResumeCache(object):
171     CACHE_DIR = '.cache/arvados/arv-put'
172
173     def __init__(self, file_spec):
174         self.cache_file = open(file_spec, 'a+')
175         self._lock_file(self.cache_file)
176         self.filename = self.cache_file.name
177
178     @classmethod
179     def make_path(cls, args):
180         md5 = hashlib.md5()
181         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
182         realpaths = sorted(os.path.realpath(path) for path in args.paths)
183         md5.update('\0'.join(realpaths))
184         if any(os.path.isdir(path) for path in realpaths):
185             md5.update(str(max(args.max_manifest_depth, -1)))
186         elif args.filename:
187             md5.update(args.filename)
188         return os.path.join(
189             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
190             md5.hexdigest())
191
192     def _lock_file(self, fileobj):
193         try:
194             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
195         except IOError:
196             raise ResumeCacheConflict("{} locked".format(fileobj.name))
197
198     def load(self):
199         self.cache_file.seek(0)
200         return json.load(self.cache_file)
201
202     def save(self, data):
203         try:
204             new_cache_fd, new_cache_name = tempfile.mkstemp(
205                 dir=os.path.dirname(self.filename))
206             self._lock_file(new_cache_fd)
207             new_cache = os.fdopen(new_cache_fd, 'r+')
208             json.dump(data, new_cache)
209             os.rename(new_cache_name, self.filename)
210         except (IOError, OSError, ResumeCacheConflict) as error:
211             try:
212                 os.unlink(new_cache_name)
213             except NameError:  # mkstemp failed.
214                 pass
215         else:
216             self.cache_file.close()
217             self.cache_file = new_cache
218
219     def close(self):
220         self.cache_file.close()
221
222     def destroy(self):
223         try:
224             os.unlink(self.filename)
225         except OSError as error:
226             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
227                 raise
228         self.close()
229
230     def restart(self):
231         self.destroy()
232         self.__init__(self.filename)
233
234
235 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
236     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
237                    ['bytes_written', '_seen_inputs'])
238
239     def __init__(self, cache=None, reporter=None, bytes_expected=None):
240         self.bytes_written = 0
241         self._seen_inputs = []
242         self.cache = cache
243         self.reporter = reporter
244         self.bytes_expected = bytes_expected
245         super(ArvPutCollectionWriter, self).__init__()
246
247     @classmethod
248     def from_cache(cls, cache, reporter=None, bytes_expected=None):
249         try:
250             state = cache.load()
251             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
252             writer = cls.from_state(state, cache, reporter, bytes_expected)
253         except (TypeError, ValueError,
254                 arvados.errors.StaleWriterStateError) as error:
255             return cls(cache, reporter, bytes_expected)
256         else:
257             return writer
258
259     def cache_state(self):
260         if self.cache is None:
261             return
262         state = self.dump_state()
263         # Transform attributes for serialization.
264         for attr, value in state.items():
265             if attr == '_data_buffer':
266                 state[attr] = base64.encodestring(''.join(value))
267             elif hasattr(value, 'popleft'):
268                 state[attr] = list(value)
269         self.cache.save(state)
270
271     def report_progress(self):
272         if self.reporter is not None:
273             self.reporter(self.bytes_written, self.bytes_expected)
274
275     def flush_data(self):
276         start_buffer_len = self._data_buffer_len
277         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
278         super(ArvPutCollectionWriter, self).flush_data()
279         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
280             self.bytes_written += (start_buffer_len - self._data_buffer_len)
281             self.report_progress()
282             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
283                 self.cache_state()
284
285     def _record_new_input(self, input_type, source_name, dest_name):
286         # The key needs to be a list because that's what we'll get back
287         # from JSON deserialization.
288         key = [input_type, source_name, dest_name]
289         if key in self._seen_inputs:
290             return False
291         self._seen_inputs.append(key)
292         return True
293
294     def write_file(self, source, filename=None):
295         if self._record_new_input('file', source, filename):
296             super(ArvPutCollectionWriter, self).write_file(source, filename)
297
298     def write_directory_tree(self,
299                              path, stream_name='.', max_manifest_depth=-1):
300         if self._record_new_input('directory', path, stream_name):
301             super(ArvPutCollectionWriter, self).write_directory_tree(
302                 path, stream_name, max_manifest_depth)
303
304
305 def expected_bytes_for(pathlist):
306     # Walk the given directory trees and stat files, adding up file sizes,
307     # so we can display progress as percent
308     bytesum = 0
309     for path in pathlist:
310         if os.path.isdir(path):
311             for filename in arvados.util.listdir_recursive(path):
312                 bytesum += os.path.getsize(os.path.join(path, filename))
313         elif not os.path.isfile(path):
314             return None
315         else:
316             bytesum += os.path.getsize(path)
317     return bytesum
318
319 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
320                                                             os.getpid())
321 def machine_progress(bytes_written, bytes_expected):
322     return _machine_format.format(
323         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
324
325 def human_progress(bytes_written, bytes_expected):
326     if bytes_expected:
327         return "\r{}M / {}M {:.1%} ".format(
328             bytes_written >> 20, bytes_expected >> 20,
329             float(bytes_written) / bytes_expected)
330     else:
331         return "\r{} ".format(bytes_written)
332
333 def progress_writer(progress_func, outfile=sys.stderr):
334     def write_progress(bytes_written, bytes_expected):
335         outfile.write(progress_func(bytes_written, bytes_expected))
336     return write_progress
337
338 def exit_signal_handler(sigcode, frame):
339     sys.exit(-sigcode)
340
341 def check_project_exists(project_uuid):
342     try:
343         arvados.api('v1').groups().get(uuid=project_uuid).execute()
344     except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
345         raise ValueError("Project {} not found ({})".format(project_uuid,
346                                                             error))
347     else:
348         return True
349
350 def prep_project_link(args, stderr, project_exists=check_project_exists):
351     # Given the user's command line arguments, return a dictionary with data
352     # to create the desired project link for this Collection, or None.
353     # Raises ValueError if the arguments request something impossible.
354     making_collection = not (args.raw or args.stream)
355     any_link_spec = args.project_uuid or args.name
356     if not making_collection:
357         if any_link_spec:
358             raise ValueError("Requested a Link without creating a Collection")
359         return None
360     elif not any_link_spec:
361         stderr.write(
362             "arv-put: No --project-uuid or --name specified.  This data will be cached\n"
363             "in Keep.  You will need to find this upload by its locator(s) later.\n")
364         return None
365     elif not args.project_uuid:
366         raise ValueError("--name requires --project-uuid")
367     elif not project_exists(args.project_uuid):
368         raise ValueError("Project {} not found".format(args.project_uuid))
369     link = {'tail_uuid': args.project_uuid, 'link_class': 'name'}
370     if args.name:
371         link['name'] = args.name
372     return link
373
374 def create_project_link(locator, link):
375     link['head_uuid'] = locator
376     link.setdefault('name', "Collection saved by {}@{} at {}".format(
377             pwd.getpwuid(os.getuid()).pw_name,
378             socket.gethostname(),
379             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")))
380     return arvados.api('v1').links().create(body=link).execute()
381
382 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
383     status = 0
384
385     args = parse_arguments(arguments)
386     try:
387         project_link = prep_project_link(args, stderr)
388     except ValueError as error:
389         print >>stderr, "arv-put: {}.".format(error)
390         sys.exit(2)
391
392     if args.progress:
393         reporter = progress_writer(human_progress)
394     elif args.batch_progress:
395         reporter = progress_writer(machine_progress)
396     else:
397         reporter = None
398     bytes_expected = expected_bytes_for(args.paths)
399
400     resume_cache = None
401     if args.resume:
402         try:
403             resume_cache = ResumeCache(ResumeCache.make_path(args))
404         except (IOError, OSError, ValueError):
405             pass  # Couldn't open cache directory/file.  Continue without it.
406         except ResumeCacheConflict:
407             print >>stderr, "\n".join([
408                 "arv-put: Another process is already uploading this data.",
409                 "         Use --no-resume if this is really what you want."])
410             sys.exit(1)
411
412     if resume_cache is None:
413         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
414     else:
415         resume_cache.restart()
416         writer = ArvPutCollectionWriter.from_cache(
417             resume_cache, reporter, bytes_expected)
418
419     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
420     # the originals.
421     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
422                             for sigcode in CAUGHT_SIGNALS}
423
424     if writer.bytes_written > 0:  # We're resuming a previous upload.
425         print >>stderr, "\n".join([
426                 "arv-put: Resuming previous upload from last checkpoint.",
427                 "         Use the --no-resume option to start over."])
428
429     writer.report_progress()
430     writer.do_queued_work()  # Do work resumed from cache.
431     for path in args.paths:  # Copy file data to Keep.
432         if os.path.isdir(path):
433             writer.write_directory_tree(
434                 path, max_manifest_depth=args.max_manifest_depth)
435         else:
436             writer.start_new_stream()
437             writer.write_file(path, args.filename or os.path.basename(path))
438     writer.finish_current_stream()
439
440     if args.progress:  # Print newline to split stderr from stdout for humans.
441         print >>stderr
442
443     if args.stream:
444         output = writer.manifest_text()
445     elif args.raw:
446         output = ','.join(writer.data_locators())
447     else:
448         # Register the resulting collection in Arvados.
449         collection = arvados.api().collections().create(
450             body={
451                 'uuid': writer.finish(),
452                 'manifest_text': writer.manifest_text(),
453                 },
454             ).execute()
455
456         # Print the locator (uuid) of the new collection.
457         output = collection['uuid']
458         if project_link is not None:
459             try:
460                 create_project_link(output, project_link)
461             except apiclient.errors.Error as error:
462                 print >>stderr, (
463                     "arv-put: Error adding Collection to project: {}.".format(
464                         error))
465                 status = 1
466
467     stdout.write(output)
468     if not output.endswith('\n'):
469         stdout.write('\n')
470
471     for sigcode, orig_handler in orig_signal_handlers.items():
472         signal.signal(sigcode, orig_handler)
473
474     if status != 0:
475         sys.exit(status)
476
477     if resume_cache is not None:
478         resume_cache.destroy()
479
480     return output
481
482 if __name__ == '__main__':
483     main()