Merge branch 'master' into 3714-report-issue-issues
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                     help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
35                     default=-1, help="""
36 Maximum depth of directory tree to represent in the manifest
37 structure. A directory structure deeper than this will be represented
38 as a single stream in the manifest. If N=0, the manifest will contain
39 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
40 stream per filesystem directory that contains files.
41 """)
42
43 upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
44 Store the collection in the specified project, instead of your Home
45 project.
46 """)
47
48 upload_opts.add_argument('--name', help="""
49 Save the collection with the specified name, rather than the default
50 generic name "Saved at {time} by {username}@{host}".
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--as-stream', action='store_true', dest='stream',
56                    help="""
57 Synonym for --stream.
58 """)
59
60 _group.add_argument('--stream', action='store_true',
61                    help="""
62 Store the file content and display the resulting manifest on
63 stdout. Do not write the manifest to Keep or save a Collection object
64 in Arvados.
65 """)
66
67 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68                    help="""
69 Synonym for --manifest.
70 """)
71
72 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73                    help="""
74 Synonym for --manifest.
75 """)
76
77 _group.add_argument('--manifest', action='store_true',
78                    help="""
79 Store the file data and resulting manifest in Keep, save a Collection
80 object in Arvados, and display the manifest locator (Collection uuid)
81 on stdout. This is the default behavior.
82 """)
83
84 _group.add_argument('--as-raw', action='store_true', dest='raw',
85                    help="""
86 Synonym for --raw.
87 """)
88
89 _group.add_argument('--raw', action='store_true',
90                    help="""
91 Store the file content and display the data block locators on stdout,
92 separated by commas, with a trailing newline. Do not store a
93 manifest.
94 """)
95
96 upload_opts.add_argument('--use-filename', type=str, default=None,
97                     dest='filename', help="""
98 Synonym for --filename.
99 """)
100
101 upload_opts.add_argument('--filename', type=str, default=None,
102                     help="""
103 Use the given filename in the manifest, instead of the name of the
104 local file. This is useful when "-" or "/dev/stdin" is given as an
105 input file. It can be used only if there is exactly one path given and
106 it is not a directory. Implies --manifest.
107 """)
108
109 upload_opts.add_argument('--portable-data-hash', action='store_true',
110                     help="""
111 Print the portable data hash instead of the Arvados UUID for the collection
112 created by the upload.
113 """)
114
115 run_opts = argparse.ArgumentParser(add_help=False)
116 _group = run_opts.add_mutually_exclusive_group()
117 _group.add_argument('--progress', action='store_true',
118                    help="""
119 Display human-readable progress on stderr (bytes and, if possible,
120 percentage of total data size). This is the default behavior when
121 stderr is a tty.
122 """)
123
124 _group.add_argument('--no-progress', action='store_true',
125                    help="""
126 Do not display human-readable progress on stderr, even if stderr is a
127 tty.
128 """)
129
130 _group.add_argument('--batch-progress', action='store_true',
131                    help="""
132 Display machine-readable progress on stderr (bytes and, if known,
133 total data size).
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--resume', action='store_true', default=True,
138                    help="""
139 Continue interrupted uploads from cached state (default).
140 """)
141 _group.add_argument('--no-resume', action='store_false', dest='resume',
142                    help="""
143 Do not continue interrupted uploads from cached state.
144 """)
145
146 arg_parser = argparse.ArgumentParser(
147     description='Copy data from the local filesystem to Keep.',
148     parents=[upload_opts, run_opts])
149
150 def parse_arguments(arguments):
151     args = arg_parser.parse_args(arguments)
152
153     if len(args.paths) == 0:
154         args.paths += ['/dev/stdin']
155
156     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
157         if args.filename:
158             arg_parser.error("""
159     --filename argument cannot be used when storing a directory or
160     multiple files.
161     """)
162
163     # Turn on --progress by default if stderr is a tty.
164     if (not (args.batch_progress or args.no_progress)
165         and os.isatty(sys.stderr.fileno())):
166         args.progress = True
167
168     if args.paths == ['-']:
169         args.paths = ['/dev/stdin']
170         if not args.filename:
171             args.filename = '-'
172
173     return args
174
175 class ResumeCacheConflict(Exception):
176     pass
177
178
179 class ResumeCache(object):
180     CACHE_DIR = '.cache/arvados/arv-put'
181
182     def __init__(self, file_spec):
183         self.cache_file = open(file_spec, 'a+')
184         self._lock_file(self.cache_file)
185         self.filename = self.cache_file.name
186
187     @classmethod
188     def make_path(cls, args):
189         md5 = hashlib.md5()
190         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
191         realpaths = sorted(os.path.realpath(path) for path in args.paths)
192         md5.update('\0'.join(realpaths))
193         if any(os.path.isdir(path) for path in realpaths):
194             md5.update(str(max(args.max_manifest_depth, -1)))
195         elif args.filename:
196             md5.update(args.filename)
197         return os.path.join(
198             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
199             md5.hexdigest())
200
201     def _lock_file(self, fileobj):
202         try:
203             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
204         except IOError:
205             raise ResumeCacheConflict("{} locked".format(fileobj.name))
206
207     def load(self):
208         self.cache_file.seek(0)
209         return json.load(self.cache_file)
210
211     def save(self, data):
212         try:
213             new_cache_fd, new_cache_name = tempfile.mkstemp(
214                 dir=os.path.dirname(self.filename))
215             self._lock_file(new_cache_fd)
216             new_cache = os.fdopen(new_cache_fd, 'r+')
217             json.dump(data, new_cache)
218             os.rename(new_cache_name, self.filename)
219         except (IOError, OSError, ResumeCacheConflict) as error:
220             try:
221                 os.unlink(new_cache_name)
222             except NameError:  # mkstemp failed.
223                 pass
224         else:
225             self.cache_file.close()
226             self.cache_file = new_cache
227
228     def close(self):
229         self.cache_file.close()
230
231     def destroy(self):
232         try:
233             os.unlink(self.filename)
234         except OSError as error:
235             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
236                 raise
237         self.close()
238
239     def restart(self):
240         self.destroy()
241         self.__init__(self.filename)
242
243
244 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
245     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
246                    ['bytes_written', '_seen_inputs'])
247
248     def __init__(self, cache=None, reporter=None, bytes_expected=None,
249                  api_client=None):
250         self.bytes_written = 0
251         self._seen_inputs = []
252         self.cache = cache
253         self.reporter = reporter
254         self.bytes_expected = bytes_expected
255         super(ArvPutCollectionWriter, self).__init__(api_client)
256
257     @classmethod
258     def from_cache(cls, cache, reporter=None, bytes_expected=None):
259         try:
260             state = cache.load()
261             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
262             writer = cls.from_state(state, cache, reporter, bytes_expected)
263         except (TypeError, ValueError,
264                 arvados.errors.StaleWriterStateError) as error:
265             return cls(cache, reporter, bytes_expected)
266         else:
267             return writer
268
269     def cache_state(self):
270         if self.cache is None:
271             return
272         state = self.dump_state()
273         # Transform attributes for serialization.
274         for attr, value in state.items():
275             if attr == '_data_buffer':
276                 state[attr] = base64.encodestring(''.join(value))
277             elif hasattr(value, 'popleft'):
278                 state[attr] = list(value)
279         self.cache.save(state)
280
281     def report_progress(self):
282         if self.reporter is not None:
283             self.reporter(self.bytes_written, self.bytes_expected)
284
285     def flush_data(self):
286         start_buffer_len = self._data_buffer_len
287         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
288         super(ArvPutCollectionWriter, self).flush_data()
289         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
290             self.bytes_written += (start_buffer_len - self._data_buffer_len)
291             self.report_progress()
292             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
293                 self.cache_state()
294
295     def _record_new_input(self, input_type, source_name, dest_name):
296         # The key needs to be a list because that's what we'll get back
297         # from JSON deserialization.
298         key = [input_type, source_name, dest_name]
299         if key in self._seen_inputs:
300             return False
301         self._seen_inputs.append(key)
302         return True
303
304     def write_file(self, source, filename=None):
305         if self._record_new_input('file', source, filename):
306             super(ArvPutCollectionWriter, self).write_file(source, filename)
307
308     def write_directory_tree(self,
309                              path, stream_name='.', max_manifest_depth=-1):
310         if self._record_new_input('directory', path, stream_name):
311             super(ArvPutCollectionWriter, self).write_directory_tree(
312                 path, stream_name, max_manifest_depth)
313
314
315 def expected_bytes_for(pathlist):
316     # Walk the given directory trees and stat files, adding up file sizes,
317     # so we can display progress as percent
318     bytesum = 0
319     for path in pathlist:
320         if os.path.isdir(path):
321             for filename in arvados.util.listdir_recursive(path):
322                 bytesum += os.path.getsize(os.path.join(path, filename))
323         elif not os.path.isfile(path):
324             return None
325         else:
326             bytesum += os.path.getsize(path)
327     return bytesum
328
329 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
330                                                             os.getpid())
331 def machine_progress(bytes_written, bytes_expected):
332     return _machine_format.format(
333         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
334
335 def human_progress(bytes_written, bytes_expected):
336     if bytes_expected:
337         return "\r{}M / {}M {:.1%} ".format(
338             bytes_written >> 20, bytes_expected >> 20,
339             float(bytes_written) / bytes_expected)
340     else:
341         return "\r{} ".format(bytes_written)
342
343 def progress_writer(progress_func, outfile=sys.stderr):
344     def write_progress(bytes_written, bytes_expected):
345         outfile.write(progress_func(bytes_written, bytes_expected))
346     return write_progress
347
348 def exit_signal_handler(sigcode, frame):
349     sys.exit(-sigcode)
350
351 def check_project_exists(project_uuid):
352     try:
353         api_client.groups().get(uuid=project_uuid).execute()
354     except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
355         raise ValueError("Project {} not found ({})".format(project_uuid,
356                                                             error))
357     else:
358         return True
359
360 def prep_project_link(args, stderr, project_exists=check_project_exists):
361     # Given the user's command line arguments, return a dictionary with data
362     # to create the desired project link for this Collection, or None.
363     # Raises ValueError if the arguments request something impossible.
364     making_collection = not (args.raw or args.stream)
365     if not making_collection:
366         if args.name or args.project_uuid:
367             raise ValueError("Requested a Link without creating a Collection")
368         return None
369     link = {'tail_uuid': args.project_uuid,
370             'link_class': 'name',
371             'name': args.name}
372     if not link['tail_uuid']:
373         link['tail_uuid'] = api_client.users().current().execute()['uuid']
374     elif not project_exists(link['tail_uuid']):
375         raise ValueError("Project {} not found".format(args.project_uuid))
376     if not link['name']:
377         link['name'] = "Saved at {} by {}@{}".format(
378             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
379             pwd.getpwuid(os.getuid()).pw_name,
380             socket.gethostname())
381         stderr.write(
382             "arv-put: No --name specified. Saving as \"%s\"\n" % link['name'])
383     link['owner_uuid'] = link['tail_uuid']
384     return link
385
386 def create_project_link(locator, link):
387     link['head_uuid'] = locator
388     return api_client.links().create(body=link).execute()
389
390 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
391     global api_client
392     if api_client is None:
393         api_client = arvados.api('v1')
394     status = 0
395
396     args = parse_arguments(arguments)
397     try:
398         project_link = prep_project_link(args, stderr)
399     except ValueError as error:
400         print >>stderr, "arv-put: {}.".format(error)
401         sys.exit(2)
402
403     if args.progress:
404         reporter = progress_writer(human_progress)
405     elif args.batch_progress:
406         reporter = progress_writer(machine_progress)
407     else:
408         reporter = None
409     bytes_expected = expected_bytes_for(args.paths)
410
411     resume_cache = None
412     if args.resume:
413         try:
414             resume_cache = ResumeCache(ResumeCache.make_path(args))
415         except (IOError, OSError, ValueError):
416             pass  # Couldn't open cache directory/file.  Continue without it.
417         except ResumeCacheConflict:
418             print >>stderr, "\n".join([
419                 "arv-put: Another process is already uploading this data.",
420                 "         Use --no-resume if this is really what you want."])
421             sys.exit(1)
422
423     if resume_cache is None:
424         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
425     else:
426         writer = ArvPutCollectionWriter.from_cache(
427             resume_cache, reporter, bytes_expected)
428
429     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
430     # the originals.
431     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
432                             for sigcode in CAUGHT_SIGNALS}
433
434     if writer.bytes_written > 0:  # We're resuming a previous upload.
435         print >>stderr, "\n".join([
436                 "arv-put: Resuming previous upload from last checkpoint.",
437                 "         Use the --no-resume option to start over."])
438
439     writer.report_progress()
440     writer.do_queued_work()  # Do work resumed from cache.
441     for path in args.paths:  # Copy file data to Keep.
442         if os.path.isdir(path):
443             writer.write_directory_tree(
444                 path, max_manifest_depth=args.max_manifest_depth)
445         else:
446             writer.start_new_stream()
447             writer.write_file(path, args.filename or os.path.basename(path))
448     writer.finish_current_stream()
449
450     if args.progress:  # Print newline to split stderr from stdout for humans.
451         print >>stderr
452
453     if args.stream:
454         output = writer.manifest_text()
455     elif args.raw:
456         output = ','.join(writer.data_locators())
457     else:
458         # Register the resulting collection in Arvados.
459         collection = api_client.collections().create(
460             body={
461                 'manifest_text': writer.manifest_text(),
462                 'owner_uuid': project_link['tail_uuid']
463                 },
464             ).execute()
465
466         if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
467             output = collection['portable_data_hash']
468         else:
469             output = collection['uuid']
470
471         if project_link is not None:
472             # Update collection name
473             try:
474                 if 'name' in collection:
475                     arvados.api().collections().update(uuid=collection['uuid'],
476                                                        body={"name": project_link["name"]}).execute()
477                 else:
478                     create_project_link(output, project_link)
479             except apiclient.errors.Error as error:
480                 print >>stderr, (
481                     "arv-put: Error adding Collection to project: {}.".format(
482                         error))
483                 status = 1
484
485     # Print the locator (uuid) of the new collection.
486     stdout.write(output)
487     if not output.endswith('\n'):
488         stdout.write('\n')
489
490     for sigcode, orig_handler in orig_signal_handlers.items():
491         signal.signal(sigcode, orig_handler)
492
493     if status != 0:
494         sys.exit(status)
495
496     if resume_cache is not None:
497         resume_cache.destroy()
498
499     return output
500
501 if __name__ == '__main__':
502     main()