Merge branch '3609-arv-ws' refs #3609
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                     help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
35                     default=-1, help="""
36 Maximum depth of directory tree to represent in the manifest
37 structure. A directory structure deeper than this will be represented
38 as a single stream in the manifest. If N=0, the manifest will contain
39 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
40 stream per filesystem directory that contains files.
41 """)
42
43 _group = upload_opts.add_mutually_exclusive_group()
44
45 _group.add_argument('--as-stream', action='store_true', dest='stream',
46                    help="""
47 Synonym for --stream.
48 """)
49
50 _group.add_argument('--stream', action='store_true',
51                    help="""
52 Store the file content and display the resulting manifest on
53 stdout. Do not write the manifest to Keep or save a Collection object
54 in Arvados.
55 """)
56
57 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
58                    help="""
59 Synonym for --manifest.
60 """)
61
62 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
63                    help="""
64 Synonym for --manifest.
65 """)
66
67 _group.add_argument('--manifest', action='store_true',
68                    help="""
69 Store the file data and resulting manifest in Keep, save a Collection
70 object in Arvados, and display the manifest locator (Collection uuid)
71 on stdout. This is the default behavior.
72 """)
73
74 _group.add_argument('--as-raw', action='store_true', dest='raw',
75                    help="""
76 Synonym for --raw.
77 """)
78
79 _group.add_argument('--raw', action='store_true',
80                    help="""
81 Store the file content and display the data block locators on stdout,
82 separated by commas, with a trailing newline. Do not store a
83 manifest.
84 """)
85
86 upload_opts.add_argument('--use-filename', type=str, default=None,
87                     dest='filename', help="""
88 Synonym for --filename.
89 """)
90
91 upload_opts.add_argument('--filename', type=str, default=None,
92                     help="""
93 Use the given filename in the manifest, instead of the name of the
94 local file. This is useful when "-" or "/dev/stdin" is given as an
95 input file. It can be used only if there is exactly one path given and
96 it is not a directory. Implies --manifest.
97 """)
98
99 upload_opts.add_argument('--portable-data-hash', action='store_true',
100                     help="""
101 Print the portable data hash instead of the Arvados UUID for the collection
102 created by the upload.
103 """)
104
105 run_opts = argparse.ArgumentParser(add_help=False)
106
107 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
108 Store the collection in the specified project, instead of your Home
109 project.
110 """)
111
112 run_opts.add_argument('--name', help="""
113 Save the collection with the specified name.
114 """)
115
116 _group = run_opts.add_mutually_exclusive_group()
117 _group.add_argument('--progress', action='store_true',
118                    help="""
119 Display human-readable progress on stderr (bytes and, if possible,
120 percentage of total data size). This is the default behavior when
121 stderr is a tty.
122 """)
123
124 _group.add_argument('--no-progress', action='store_true',
125                    help="""
126 Do not display human-readable progress on stderr, even if stderr is a
127 tty.
128 """)
129
130 _group.add_argument('--batch-progress', action='store_true',
131                    help="""
132 Display machine-readable progress on stderr (bytes and, if known,
133 total data size).
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--resume', action='store_true', default=True,
138                    help="""
139 Continue interrupted uploads from cached state (default).
140 """)
141 _group.add_argument('--no-resume', action='store_false', dest='resume',
142                    help="""
143 Do not continue interrupted uploads from cached state.
144 """)
145
146 arg_parser = argparse.ArgumentParser(
147     description='Copy data from the local filesystem to Keep.',
148     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
149
150 def parse_arguments(arguments):
151     args = arg_parser.parse_args(arguments)
152
153     if len(args.paths) == 0:
154         args.paths += ['/dev/stdin']
155
156     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
157         if args.filename:
158             arg_parser.error("""
159     --filename argument cannot be used when storing a directory or
160     multiple files.
161     """)
162
163     # Turn on --progress by default if stderr is a tty.
164     if (not (args.batch_progress or args.no_progress)
165         and os.isatty(sys.stderr.fileno())):
166         args.progress = True
167
168     if args.paths == ['-']:
169         args.paths = ['/dev/stdin']
170         if not args.filename:
171             args.filename = '-'
172
173     return args
174
175 class ResumeCacheConflict(Exception):
176     pass
177
178
179 class ResumeCache(object):
180     CACHE_DIR = '.cache/arvados/arv-put'
181
182     def __init__(self, file_spec):
183         self.cache_file = open(file_spec, 'a+')
184         self._lock_file(self.cache_file)
185         self.filename = self.cache_file.name
186
187     @classmethod
188     def make_path(cls, args):
189         md5 = hashlib.md5()
190         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
191         realpaths = sorted(os.path.realpath(path) for path in args.paths)
192         md5.update('\0'.join(realpaths))
193         if any(os.path.isdir(path) for path in realpaths):
194             md5.update(str(max(args.max_manifest_depth, -1)))
195         elif args.filename:
196             md5.update(args.filename)
197         return os.path.join(
198             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
199             md5.hexdigest())
200
201     def _lock_file(self, fileobj):
202         try:
203             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
204         except IOError:
205             raise ResumeCacheConflict("{} locked".format(fileobj.name))
206
207     def load(self):
208         self.cache_file.seek(0)
209         return json.load(self.cache_file)
210
211     def save(self, data):
212         try:
213             new_cache_fd, new_cache_name = tempfile.mkstemp(
214                 dir=os.path.dirname(self.filename))
215             self._lock_file(new_cache_fd)
216             new_cache = os.fdopen(new_cache_fd, 'r+')
217             json.dump(data, new_cache)
218             os.rename(new_cache_name, self.filename)
219         except (IOError, OSError, ResumeCacheConflict) as error:
220             try:
221                 os.unlink(new_cache_name)
222             except NameError:  # mkstemp failed.
223                 pass
224         else:
225             self.cache_file.close()
226             self.cache_file = new_cache
227
228     def close(self):
229         self.cache_file.close()
230
231     def destroy(self):
232         try:
233             os.unlink(self.filename)
234         except OSError as error:
235             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
236                 raise
237         self.close()
238
239     def restart(self):
240         self.destroy()
241         self.__init__(self.filename)
242
243
244 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
245     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
246                    ['bytes_written', '_seen_inputs'])
247
248     def __init__(self, cache=None, reporter=None, bytes_expected=None,
249                  api_client=None, num_retries=0):
250         self.bytes_written = 0
251         self._seen_inputs = []
252         self.cache = cache
253         self.reporter = reporter
254         self.bytes_expected = bytes_expected
255         super(ArvPutCollectionWriter, self).__init__(
256             api_client, num_retries=num_retries)
257
258     @classmethod
259     def from_cache(cls, cache, reporter=None, bytes_expected=None,
260                    num_retries=0):
261         try:
262             state = cache.load()
263             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
264             writer = cls.from_state(state, cache, reporter, bytes_expected,
265                                     num_retries=num_retries)
266         except (TypeError, ValueError,
267                 arvados.errors.StaleWriterStateError) as error:
268             return cls(cache, reporter, bytes_expected, num_retries=num_retries)
269         else:
270             return writer
271
272     def cache_state(self):
273         if self.cache is None:
274             return
275         state = self.dump_state()
276         # Transform attributes for serialization.
277         for attr, value in state.items():
278             if attr == '_data_buffer':
279                 state[attr] = base64.encodestring(''.join(value))
280             elif hasattr(value, 'popleft'):
281                 state[attr] = list(value)
282         self.cache.save(state)
283
284     def report_progress(self):
285         if self.reporter is not None:
286             self.reporter(self.bytes_written, self.bytes_expected)
287
288     def flush_data(self):
289         start_buffer_len = self._data_buffer_len
290         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
291         super(ArvPutCollectionWriter, self).flush_data()
292         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
293             self.bytes_written += (start_buffer_len - self._data_buffer_len)
294             self.report_progress()
295             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
296                 self.cache_state()
297
298     def _record_new_input(self, input_type, source_name, dest_name):
299         # The key needs to be a list because that's what we'll get back
300         # from JSON deserialization.
301         key = [input_type, source_name, dest_name]
302         if key in self._seen_inputs:
303             return False
304         self._seen_inputs.append(key)
305         return True
306
307     def write_file(self, source, filename=None):
308         if self._record_new_input('file', source, filename):
309             super(ArvPutCollectionWriter, self).write_file(source, filename)
310
311     def write_directory_tree(self,
312                              path, stream_name='.', max_manifest_depth=-1):
313         if self._record_new_input('directory', path, stream_name):
314             super(ArvPutCollectionWriter, self).write_directory_tree(
315                 path, stream_name, max_manifest_depth)
316
317
318 def expected_bytes_for(pathlist):
319     # Walk the given directory trees and stat files, adding up file sizes,
320     # so we can display progress as percent
321     bytesum = 0
322     for path in pathlist:
323         if os.path.isdir(path):
324             for filename in arvados.util.listdir_recursive(path):
325                 bytesum += os.path.getsize(os.path.join(path, filename))
326         elif not os.path.isfile(path):
327             return None
328         else:
329             bytesum += os.path.getsize(path)
330     return bytesum
331
332 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
333                                                             os.getpid())
334 def machine_progress(bytes_written, bytes_expected):
335     return _machine_format.format(
336         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
337
338 def human_progress(bytes_written, bytes_expected):
339     if bytes_expected:
340         return "\r{}M / {}M {:.1%} ".format(
341             bytes_written >> 20, bytes_expected >> 20,
342             float(bytes_written) / bytes_expected)
343     else:
344         return "\r{} ".format(bytes_written)
345
346 def progress_writer(progress_func, outfile=sys.stderr):
347     def write_progress(bytes_written, bytes_expected):
348         outfile.write(progress_func(bytes_written, bytes_expected))
349     return write_progress
350
351 def exit_signal_handler(sigcode, frame):
352     sys.exit(-sigcode)
353
354 def desired_project_uuid(api_client, project_uuid, num_retries):
355     if not project_uuid:
356         query = api_client.users().current()
357     elif arvados.util.user_uuid_pattern.match(project_uuid):
358         query = api_client.users().get(uuid=project_uuid)
359     elif arvados.util.group_uuid_pattern.match(project_uuid):
360         query = api_client.groups().get(uuid=project_uuid)
361     else:
362         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
363     return query.execute(num_retries=num_retries)['uuid']
364
365 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
366     global api_client
367
368     args = parse_arguments(arguments)
369     status = 0
370     if api_client is None:
371         api_client = arvados.api('v1')
372
373     # Determine the name to use
374     if args.name:
375         if args.stream or args.raw:
376             print >>stderr, "Cannot use --name with --stream or --raw"
377             sys.exit(1)
378         collection_name = args.name
379     else:
380         collection_name = "Saved at {} by {}@{}".format(
381             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
382             pwd.getpwuid(os.getuid()).pw_name,
383             socket.gethostname())
384
385     if args.project_uuid and (args.stream or args.raw):
386         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
387         sys.exit(1)
388
389     # Determine the parent project
390     try:
391         project_uuid = desired_project_uuid(api_client, args.project_uuid,
392                                             args.retries)
393     except (apiclient.errors.Error, ValueError) as error:
394         print >>stderr, error
395         sys.exit(1)
396
397     if args.progress:
398         reporter = progress_writer(human_progress)
399     elif args.batch_progress:
400         reporter = progress_writer(machine_progress)
401     else:
402         reporter = None
403     bytes_expected = expected_bytes_for(args.paths)
404
405     resume_cache = None
406     if args.resume:
407         try:
408             resume_cache = ResumeCache(ResumeCache.make_path(args))
409         except (IOError, OSError, ValueError):
410             pass  # Couldn't open cache directory/file.  Continue without it.
411         except ResumeCacheConflict:
412             print >>stderr, "\n".join([
413                 "arv-put: Another process is already uploading this data.",
414                 "         Use --no-resume if this is really what you want."])
415             sys.exit(1)
416
417     if resume_cache is None:
418         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
419                                         num_retries=args.retries)
420     else:
421         writer = ArvPutCollectionWriter.from_cache(
422             resume_cache, reporter, bytes_expected, num_retries=args.retries)
423
424     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
425     # the originals.
426     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
427                             for sigcode in CAUGHT_SIGNALS}
428
429     if writer.bytes_written > 0:  # We're resuming a previous upload.
430         print >>stderr, "\n".join([
431                 "arv-put: Resuming previous upload from last checkpoint.",
432                 "         Use the --no-resume option to start over."])
433
434     writer.report_progress()
435     writer.do_queued_work()  # Do work resumed from cache.
436     for path in args.paths:  # Copy file data to Keep.
437         if os.path.isdir(path):
438             writer.write_directory_tree(
439                 path, max_manifest_depth=args.max_manifest_depth)
440         else:
441             writer.start_new_stream()
442             writer.write_file(path, args.filename or os.path.basename(path))
443     writer.finish_current_stream()
444
445     if args.progress:  # Print newline to split stderr from stdout for humans.
446         print >>stderr
447
448     if args.stream:
449         output = writer.manifest_text()
450     elif args.raw:
451         output = ','.join(writer.data_locators())
452     else:
453         try:
454             # Register the resulting collection in Arvados.
455             collection = api_client.collections().create(
456                 body={
457                     'owner_uuid': project_uuid,
458                     'name': collection_name,
459                     'manifest_text': writer.manifest_text()
460                     },
461                 ensure_unique_name=True
462                 ).execute(num_retries=args.retries)
463
464             print >>stderr, "Collection saved as '%s'" % collection['name']
465
466             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
467                 output = collection['portable_data_hash']
468             else:
469                 output = collection['uuid']
470
471         except apiclient.errors.Error as error:
472             print >>stderr, (
473                 "arv-put: Error creating Collection on project: {}.".format(
474                     error))
475             status = 1
476
477     # Print the locator (uuid) of the new collection.
478     stdout.write(output)
479     if not output.endswith('\n'):
480         stdout.write('\n')
481
482     for sigcode, orig_handler in orig_signal_handlers.items():
483         signal.signal(sigcode, orig_handler)
484
485     if status != 0:
486         sys.exit(status)
487
488     if resume_cache is not None:
489         resume_cache.destroy()
490
491     return output
492
493 if __name__ == '__main__':
494     main()