Merge branch '2800-python-global-state' into 2800-pgs
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25
26 upload_opts = argparse.ArgumentParser(add_help=False)
27
28 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
29                     help="""
30 Local file or directory. Default: read from standard input.
31 """)
32
33 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
34                     default=-1, help="""
35 Maximum depth of directory tree to represent in the manifest
36 structure. A directory structure deeper than this will be represented
37 as a single stream in the manifest. If N=0, the manifest will contain
38 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
39 stream per filesystem directory that contains files.
40 """)
41
42 upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
43 Store the collection in the specified project, instead of your Home
44 project.
45 """)
46
47 upload_opts.add_argument('--name', help="""
48 Save the collection with the specified name, rather than the default
49 generic name "Saved at {time} by {username}@{host}".
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
55                    help="""
56 Synonym for --stream.
57 """)
58
59 _group.add_argument('--stream', action='store_true',
60                    help="""
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
63 in Arvados.
64 """)
65
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67                    help="""
68 Synonym for --manifest.
69 """)
70
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72                    help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--manifest', action='store_true',
77                    help="""
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
81 """)
82
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
84                    help="""
85 Synonym for --raw.
86 """)
87
88 _group.add_argument('--raw', action='store_true',
89                    help="""
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
92 manifest.
93 """)
94
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96                     dest='filename', help="""
97 Synonym for --filename.
98 """)
99
100 upload_opts.add_argument('--filename', type=str, default=None,
101                     help="""
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
106 """)
107
108 run_opts = argparse.ArgumentParser(add_help=False)
109 _group = run_opts.add_mutually_exclusive_group()
110 _group.add_argument('--progress', action='store_true',
111                    help="""
112 Display human-readable progress on stderr (bytes and, if possible,
113 percentage of total data size). This is the default behavior when
114 stderr is a tty.
115 """)
116
117 _group.add_argument('--no-progress', action='store_true',
118                    help="""
119 Do not display human-readable progress on stderr, even if stderr is a
120 tty.
121 """)
122
123 _group.add_argument('--batch-progress', action='store_true',
124                    help="""
125 Display machine-readable progress on stderr (bytes and, if known,
126 total data size).
127 """)
128
129 _group = run_opts.add_mutually_exclusive_group()
130 _group.add_argument('--resume', action='store_true', default=True,
131                    help="""
132 Continue interrupted uploads from cached state (default).
133 """)
134 _group.add_argument('--no-resume', action='store_false', dest='resume',
135                    help="""
136 Do not continue interrupted uploads from cached state.
137 """)
138
139 arg_parser = argparse.ArgumentParser(
140     description='Copy data from the local filesystem to Keep.',
141     parents=[upload_opts, run_opts])
142
143 def parse_arguments(arguments):
144     args = arg_parser.parse_args(arguments)
145
146     if len(args.paths) == 0:
147         args.paths += ['/dev/stdin']
148
149     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
150         if args.filename:
151             arg_parser.error("""
152     --filename argument cannot be used when storing a directory or
153     multiple files.
154     """)
155
156     # Turn on --progress by default if stderr is a tty.
157     if (not (args.batch_progress or args.no_progress)
158         and os.isatty(sys.stderr.fileno())):
159         args.progress = True
160
161     if args.paths == ['-']:
162         args.paths = ['/dev/stdin']
163         if not args.filename:
164             args.filename = '-'
165
166     return args
167
168 class ResumeCacheConflict(Exception):
169     pass
170
171
172 class ResumeCache(object):
173     CACHE_DIR = '.cache/arvados/arv-put'
174
175     def __init__(self, file_spec):
176         self.cache_file = open(file_spec, 'a+')
177         self._lock_file(self.cache_file)
178         self.filename = self.cache_file.name
179
180     @classmethod
181     def make_path(cls, args):
182         md5 = hashlib.md5()
183         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
184         realpaths = sorted(os.path.realpath(path) for path in args.paths)
185         md5.update('\0'.join(realpaths))
186         if any(os.path.isdir(path) for path in realpaths):
187             md5.update(str(max(args.max_manifest_depth, -1)))
188         elif args.filename:
189             md5.update(args.filename)
190         return os.path.join(
191             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
192             md5.hexdigest())
193
194     def _lock_file(self, fileobj):
195         try:
196             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
197         except IOError:
198             raise ResumeCacheConflict("{} locked".format(fileobj.name))
199
200     def load(self):
201         self.cache_file.seek(0)
202         return json.load(self.cache_file)
203
204     def save(self, data):
205         try:
206             new_cache_fd, new_cache_name = tempfile.mkstemp(
207                 dir=os.path.dirname(self.filename))
208             self._lock_file(new_cache_fd)
209             new_cache = os.fdopen(new_cache_fd, 'r+')
210             json.dump(data, new_cache)
211             os.rename(new_cache_name, self.filename)
212         except (IOError, OSError, ResumeCacheConflict) as error:
213             try:
214                 os.unlink(new_cache_name)
215             except NameError:  # mkstemp failed.
216                 pass
217         else:
218             self.cache_file.close()
219             self.cache_file = new_cache
220
221     def close(self):
222         self.cache_file.close()
223
224     def destroy(self):
225         try:
226             os.unlink(self.filename)
227         except OSError as error:
228             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
229                 raise
230         self.close()
231
232     def restart(self):
233         self.destroy()
234         self.__init__(self.filename)
235
236
237 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
238     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
239                    ['bytes_written', '_seen_inputs'])
240
241     def __init__(self, cache=None, reporter=None, bytes_expected=None):
242         self.bytes_written = 0
243         self._seen_inputs = []
244         self.cache = cache
245         self.reporter = reporter
246         self.bytes_expected = bytes_expected
247         super(ArvPutCollectionWriter, self).__init__()
248
249     @classmethod
250     def from_cache(cls, cache, reporter=None, bytes_expected=None):
251         try:
252             state = cache.load()
253             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
254             writer = cls.from_state(state, cache, reporter, bytes_expected)
255         except (TypeError, ValueError,
256                 arvados.errors.StaleWriterStateError) as error:
257             return cls(cache, reporter, bytes_expected)
258         else:
259             return writer
260
261     def cache_state(self):
262         if self.cache is None:
263             return
264         state = self.dump_state()
265         # Transform attributes for serialization.
266         for attr, value in state.items():
267             if attr == '_data_buffer':
268                 state[attr] = base64.encodestring(''.join(value))
269             elif hasattr(value, 'popleft'):
270                 state[attr] = list(value)
271         self.cache.save(state)
272
273     def report_progress(self):
274         if self.reporter is not None:
275             self.reporter(self.bytes_written, self.bytes_expected)
276
277     def flush_data(self):
278         start_buffer_len = self._data_buffer_len
279         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
280         super(ArvPutCollectionWriter, self).flush_data()
281         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
282             self.bytes_written += (start_buffer_len - self._data_buffer_len)
283             self.report_progress()
284             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
285                 self.cache_state()
286
287     def _record_new_input(self, input_type, source_name, dest_name):
288         # The key needs to be a list because that's what we'll get back
289         # from JSON deserialization.
290         key = [input_type, source_name, dest_name]
291         if key in self._seen_inputs:
292             return False
293         self._seen_inputs.append(key)
294         return True
295
296     def write_file(self, source, filename=None):
297         if self._record_new_input('file', source, filename):
298             super(ArvPutCollectionWriter, self).write_file(source, filename)
299
300     def write_directory_tree(self,
301                              path, stream_name='.', max_manifest_depth=-1):
302         if self._record_new_input('directory', path, stream_name):
303             super(ArvPutCollectionWriter, self).write_directory_tree(
304                 path, stream_name, max_manifest_depth)
305
306
307 def expected_bytes_for(pathlist):
308     # Walk the given directory trees and stat files, adding up file sizes,
309     # so we can display progress as percent
310     bytesum = 0
311     for path in pathlist:
312         if os.path.isdir(path):
313             for filename in arvados.util.listdir_recursive(path):
314                 bytesum += os.path.getsize(os.path.join(path, filename))
315         elif not os.path.isfile(path):
316             return None
317         else:
318             bytesum += os.path.getsize(path)
319     return bytesum
320
321 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
322                                                             os.getpid())
323 def machine_progress(bytes_written, bytes_expected):
324     return _machine_format.format(
325         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
326
327 def human_progress(bytes_written, bytes_expected):
328     if bytes_expected:
329         return "\r{}M / {}M {:.1%} ".format(
330             bytes_written >> 20, bytes_expected >> 20,
331             float(bytes_written) / bytes_expected)
332     else:
333         return "\r{} ".format(bytes_written)
334
335 def progress_writer(progress_func, outfile=sys.stderr):
336     def write_progress(bytes_written, bytes_expected):
337         outfile.write(progress_func(bytes_written, bytes_expected))
338     return write_progress
339
340 def exit_signal_handler(sigcode, frame):
341     sys.exit(-sigcode)
342
343 def check_project_exists(project_uuid):
344     try:
345         arvados.api('v1').groups().get(uuid=project_uuid).execute()
346     except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
347         raise ValueError("Project {} not found ({})".format(project_uuid,
348                                                             error))
349     else:
350         return True
351
352 def prep_project_link(args, stderr, project_exists=check_project_exists):
353     # Given the user's command line arguments, return a dictionary with data
354     # to create the desired project link for this Collection, or None.
355     # Raises ValueError if the arguments request something impossible.
356     making_collection = not (args.raw or args.stream)
357     if not making_collection:
358         if args.name or args.project_uuid:
359             raise ValueError("Requested a Link without creating a Collection")
360         return None
361     link = {'tail_uuid': args.project_uuid,
362             'link_class': 'name',
363             'name': args.name}
364     if not link['tail_uuid']:
365         link['tail_uuid'] = arvados.api('v1').users().current().execute()['uuid']
366     elif not project_exists(link['tail_uuid']):
367         raise ValueError("Project {} not found".format(args.project_uuid))
368     if not link['name']:
369         link['name'] = "Saved at {} by {}@{}".format(
370             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
371             pwd.getpwuid(os.getuid()).pw_name,
372             socket.gethostname())
373         stderr.write(
374             "arv-put: No --name specified. Saving as \"%s\"\n" % link['name'])
375     link['owner_uuid'] = link['tail_uuid']
376     return link
377
378 def create_project_link(locator, link):
379     link['head_uuid'] = locator
380     return arvados.api('v1').links().create(body=link).execute()
381
382 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
383     status = 0
384
385     args = parse_arguments(arguments)
386     try:
387         project_link = prep_project_link(args, stderr)
388     except ValueError as error:
389         print >>stderr, "arv-put: {}.".format(error)
390         sys.exit(2)
391
392     if args.progress:
393         reporter = progress_writer(human_progress)
394     elif args.batch_progress:
395         reporter = progress_writer(machine_progress)
396     else:
397         reporter = None
398     bytes_expected = expected_bytes_for(args.paths)
399
400     resume_cache = None
401     if args.resume:
402         try:
403             resume_cache = ResumeCache(ResumeCache.make_path(args))
404         except (IOError, OSError, ValueError):
405             pass  # Couldn't open cache directory/file.  Continue without it.
406         except ResumeCacheConflict:
407             print >>stderr, "\n".join([
408                 "arv-put: Another process is already uploading this data.",
409                 "         Use --no-resume if this is really what you want."])
410             sys.exit(1)
411
412     if resume_cache is None:
413         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
414     else:
415         writer = ArvPutCollectionWriter.from_cache(
416             resume_cache, reporter, bytes_expected)
417
418     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
419     # the originals.
420     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
421                             for sigcode in CAUGHT_SIGNALS}
422
423     if writer.bytes_written > 0:  # We're resuming a previous upload.
424         print >>stderr, "\n".join([
425                 "arv-put: Resuming previous upload from last checkpoint.",
426                 "         Use the --no-resume option to start over."])
427
428     writer.report_progress()
429     writer.do_queued_work()  # Do work resumed from cache.
430     for path in args.paths:  # Copy file data to Keep.
431         if os.path.isdir(path):
432             writer.write_directory_tree(
433                 path, max_manifest_depth=args.max_manifest_depth)
434         else:
435             writer.start_new_stream()
436             writer.write_file(path, args.filename or os.path.basename(path))
437     writer.finish_current_stream()
438
439     if args.progress:  # Print newline to split stderr from stdout for humans.
440         print >>stderr
441
442     if args.stream:
443         output = writer.manifest_text()
444     elif args.raw:
445         output = ','.join(writer.data_locators())
446     else:
447         # Register the resulting collection in Arvados.
448         collection = arvados.api().collections().create(
449             body={
450                 'uuid': writer.finish(),
451                 'manifest_text': writer.manifest_text(),
452                 },
453             ).execute()
454
455         # Print the locator (uuid) of the new collection.
456         output = collection['uuid']
457         if project_link is not None:
458             try:
459                 create_project_link(output, project_link)
460             except apiclient.errors.Error as error:
461                 print >>stderr, (
462                     "arv-put: Error adding Collection to project: {}.".format(
463                         error))
464                 status = 1
465
466     stdout.write(output)
467     if not output.endswith('\n'):
468         stdout.write('\n')
469
470     for sigcode, orig_handler in orig_signal_handlers.items():
471         signal.signal(sigcode, orig_handler)
472
473     if status != 0:
474         sys.exit(status)
475
476     if resume_cache is not None:
477         resume_cache.destroy()
478
479     return output
480
481 if __name__ == '__main__':
482     main()