3354: Merge branch 'master' into 3354-render-textile
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                     help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
35                     default=-1, help="""
36 Maximum depth of directory tree to represent in the manifest
37 structure. A directory structure deeper than this will be represented
38 as a single stream in the manifest. If N=0, the manifest will contain
39 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
40 stream per filesystem directory that contains files.
41 """)
42
43 upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
44 Store the collection in the specified project, instead of your Home
45 project.
46 """)
47
48 upload_opts.add_argument('--name', help="""
49 Save the collection with the specified name, rather than the default
50 generic name "Saved at {time} by {username}@{host}".
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--as-stream', action='store_true', dest='stream',
56                    help="""
57 Synonym for --stream.
58 """)
59
60 _group.add_argument('--stream', action='store_true',
61                    help="""
62 Store the file content and display the resulting manifest on
63 stdout. Do not write the manifest to Keep or save a Collection object
64 in Arvados.
65 """)
66
67 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68                    help="""
69 Synonym for --manifest.
70 """)
71
72 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73                    help="""
74 Synonym for --manifest.
75 """)
76
77 _group.add_argument('--manifest', action='store_true',
78                    help="""
79 Store the file data and resulting manifest in Keep, save a Collection
80 object in Arvados, and display the manifest locator (Collection uuid)
81 on stdout. This is the default behavior.
82 """)
83
84 _group.add_argument('--as-raw', action='store_true', dest='raw',
85                    help="""
86 Synonym for --raw.
87 """)
88
89 _group.add_argument('--raw', action='store_true',
90                    help="""
91 Store the file content and display the data block locators on stdout,
92 separated by commas, with a trailing newline. Do not store a
93 manifest.
94 """)
95
96 upload_opts.add_argument('--use-filename', type=str, default=None,
97                     dest='filename', help="""
98 Synonym for --filename.
99 """)
100
101 upload_opts.add_argument('--filename', type=str, default=None,
102                     help="""
103 Use the given filename in the manifest, instead of the name of the
104 local file. This is useful when "-" or "/dev/stdin" is given as an
105 input file. It can be used only if there is exactly one path given and
106 it is not a directory. Implies --manifest.
107 """)
108
109 run_opts = argparse.ArgumentParser(add_help=False)
110 _group = run_opts.add_mutually_exclusive_group()
111 _group.add_argument('--progress', action='store_true',
112                    help="""
113 Display human-readable progress on stderr (bytes and, if possible,
114 percentage of total data size). This is the default behavior when
115 stderr is a tty.
116 """)
117
118 _group.add_argument('--no-progress', action='store_true',
119                    help="""
120 Do not display human-readable progress on stderr, even if stderr is a
121 tty.
122 """)
123
124 _group.add_argument('--batch-progress', action='store_true',
125                    help="""
126 Display machine-readable progress on stderr (bytes and, if known,
127 total data size).
128 """)
129
130 _group = run_opts.add_mutually_exclusive_group()
131 _group.add_argument('--resume', action='store_true', default=True,
132                    help="""
133 Continue interrupted uploads from cached state (default).
134 """)
135 _group.add_argument('--no-resume', action='store_false', dest='resume',
136                    help="""
137 Do not continue interrupted uploads from cached state.
138 """)
139
140 arg_parser = argparse.ArgumentParser(
141     description='Copy data from the local filesystem to Keep.',
142     parents=[upload_opts, run_opts])
143
144 def parse_arguments(arguments):
145     args = arg_parser.parse_args(arguments)
146
147     if len(args.paths) == 0:
148         args.paths += ['/dev/stdin']
149
150     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
151         if args.filename:
152             arg_parser.error("""
153     --filename argument cannot be used when storing a directory or
154     multiple files.
155     """)
156
157     # Turn on --progress by default if stderr is a tty.
158     if (not (args.batch_progress or args.no_progress)
159         and os.isatty(sys.stderr.fileno())):
160         args.progress = True
161
162     if args.paths == ['-']:
163         args.paths = ['/dev/stdin']
164         if not args.filename:
165             args.filename = '-'
166
167     return args
168
169 class ResumeCacheConflict(Exception):
170     pass
171
172
173 class ResumeCache(object):
174     CACHE_DIR = '.cache/arvados/arv-put'
175
176     def __init__(self, file_spec):
177         self.cache_file = open(file_spec, 'a+')
178         self._lock_file(self.cache_file)
179         self.filename = self.cache_file.name
180
181     @classmethod
182     def make_path(cls, args):
183         md5 = hashlib.md5()
184         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
185         realpaths = sorted(os.path.realpath(path) for path in args.paths)
186         md5.update('\0'.join(realpaths))
187         if any(os.path.isdir(path) for path in realpaths):
188             md5.update(str(max(args.max_manifest_depth, -1)))
189         elif args.filename:
190             md5.update(args.filename)
191         return os.path.join(
192             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
193             md5.hexdigest())
194
195     def _lock_file(self, fileobj):
196         try:
197             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
198         except IOError:
199             raise ResumeCacheConflict("{} locked".format(fileobj.name))
200
201     def load(self):
202         self.cache_file.seek(0)
203         return json.load(self.cache_file)
204
205     def save(self, data):
206         try:
207             new_cache_fd, new_cache_name = tempfile.mkstemp(
208                 dir=os.path.dirname(self.filename))
209             self._lock_file(new_cache_fd)
210             new_cache = os.fdopen(new_cache_fd, 'r+')
211             json.dump(data, new_cache)
212             os.rename(new_cache_name, self.filename)
213         except (IOError, OSError, ResumeCacheConflict) as error:
214             try:
215                 os.unlink(new_cache_name)
216             except NameError:  # mkstemp failed.
217                 pass
218         else:
219             self.cache_file.close()
220             self.cache_file = new_cache
221
222     def close(self):
223         self.cache_file.close()
224
225     def destroy(self):
226         try:
227             os.unlink(self.filename)
228         except OSError as error:
229             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
230                 raise
231         self.close()
232
233     def restart(self):
234         self.destroy()
235         self.__init__(self.filename)
236
237
238 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
239     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
240                    ['bytes_written', '_seen_inputs'])
241
242     def __init__(self, cache=None, reporter=None, bytes_expected=None,
243                  api_client=None):
244         self.bytes_written = 0
245         self._seen_inputs = []
246         self.cache = cache
247         self.reporter = reporter
248         self.bytes_expected = bytes_expected
249         super(ArvPutCollectionWriter, self).__init__(api_client)
250
251     @classmethod
252     def from_cache(cls, cache, reporter=None, bytes_expected=None):
253         try:
254             state = cache.load()
255             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
256             writer = cls.from_state(state, cache, reporter, bytes_expected)
257         except (TypeError, ValueError,
258                 arvados.errors.StaleWriterStateError) as error:
259             return cls(cache, reporter, bytes_expected)
260         else:
261             return writer
262
263     def cache_state(self):
264         if self.cache is None:
265             return
266         state = self.dump_state()
267         # Transform attributes for serialization.
268         for attr, value in state.items():
269             if attr == '_data_buffer':
270                 state[attr] = base64.encodestring(''.join(value))
271             elif hasattr(value, 'popleft'):
272                 state[attr] = list(value)
273         self.cache.save(state)
274
275     def report_progress(self):
276         if self.reporter is not None:
277             self.reporter(self.bytes_written, self.bytes_expected)
278
279     def flush_data(self):
280         start_buffer_len = self._data_buffer_len
281         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
282         super(ArvPutCollectionWriter, self).flush_data()
283         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
284             self.bytes_written += (start_buffer_len - self._data_buffer_len)
285             self.report_progress()
286             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
287                 self.cache_state()
288
289     def _record_new_input(self, input_type, source_name, dest_name):
290         # The key needs to be a list because that's what we'll get back
291         # from JSON deserialization.
292         key = [input_type, source_name, dest_name]
293         if key in self._seen_inputs:
294             return False
295         self._seen_inputs.append(key)
296         return True
297
298     def write_file(self, source, filename=None):
299         if self._record_new_input('file', source, filename):
300             super(ArvPutCollectionWriter, self).write_file(source, filename)
301
302     def write_directory_tree(self,
303                              path, stream_name='.', max_manifest_depth=-1):
304         if self._record_new_input('directory', path, stream_name):
305             super(ArvPutCollectionWriter, self).write_directory_tree(
306                 path, stream_name, max_manifest_depth)
307
308
309 def expected_bytes_for(pathlist):
310     # Walk the given directory trees and stat files, adding up file sizes,
311     # so we can display progress as percent
312     bytesum = 0
313     for path in pathlist:
314         if os.path.isdir(path):
315             for filename in arvados.util.listdir_recursive(path):
316                 bytesum += os.path.getsize(os.path.join(path, filename))
317         elif not os.path.isfile(path):
318             return None
319         else:
320             bytesum += os.path.getsize(path)
321     return bytesum
322
323 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
324                                                             os.getpid())
325 def machine_progress(bytes_written, bytes_expected):
326     return _machine_format.format(
327         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
328
329 def human_progress(bytes_written, bytes_expected):
330     if bytes_expected:
331         return "\r{}M / {}M {:.1%} ".format(
332             bytes_written >> 20, bytes_expected >> 20,
333             float(bytes_written) / bytes_expected)
334     else:
335         return "\r{} ".format(bytes_written)
336
337 def progress_writer(progress_func, outfile=sys.stderr):
338     def write_progress(bytes_written, bytes_expected):
339         outfile.write(progress_func(bytes_written, bytes_expected))
340     return write_progress
341
342 def exit_signal_handler(sigcode, frame):
343     sys.exit(-sigcode)
344
345 def check_project_exists(project_uuid):
346     try:
347         api_client.groups().get(uuid=project_uuid).execute()
348     except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
349         raise ValueError("Project {} not found ({})".format(project_uuid,
350                                                             error))
351     else:
352         return True
353
354 def prep_project_link(args, stderr, project_exists=check_project_exists):
355     # Given the user's command line arguments, return a dictionary with data
356     # to create the desired project link for this Collection, or None.
357     # Raises ValueError if the arguments request something impossible.
358     making_collection = not (args.raw or args.stream)
359     if not making_collection:
360         if args.name or args.project_uuid:
361             raise ValueError("Requested a Link without creating a Collection")
362         return None
363     link = {'tail_uuid': args.project_uuid,
364             'link_class': 'name',
365             'name': args.name}
366     if not link['tail_uuid']:
367         link['tail_uuid'] = api_client.users().current().execute()['uuid']
368     elif not project_exists(link['tail_uuid']):
369         raise ValueError("Project {} not found".format(args.project_uuid))
370     if not link['name']:
371         link['name'] = "Saved at {} by {}@{}".format(
372             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
373             pwd.getpwuid(os.getuid()).pw_name,
374             socket.gethostname())
375         stderr.write(
376             "arv-put: No --name specified. Saving as \"%s\"\n" % link['name'])
377     link['owner_uuid'] = link['tail_uuid']
378     return link
379
380 def create_project_link(locator, link):
381     link['head_uuid'] = locator
382     return api_client.links().create(body=link).execute()
383
384 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
385     global api_client
386     if api_client is None:
387         api_client = arvados.api('v1')
388     status = 0
389
390     args = parse_arguments(arguments)
391     try:
392         project_link = prep_project_link(args, stderr)
393     except ValueError as error:
394         print >>stderr, "arv-put: {}.".format(error)
395         sys.exit(2)
396
397     if args.progress:
398         reporter = progress_writer(human_progress)
399     elif args.batch_progress:
400         reporter = progress_writer(machine_progress)
401     else:
402         reporter = None
403     bytes_expected = expected_bytes_for(args.paths)
404
405     resume_cache = None
406     if args.resume:
407         try:
408             resume_cache = ResumeCache(ResumeCache.make_path(args))
409         except (IOError, OSError, ValueError):
410             pass  # Couldn't open cache directory/file.  Continue without it.
411         except ResumeCacheConflict:
412             print >>stderr, "\n".join([
413                 "arv-put: Another process is already uploading this data.",
414                 "         Use --no-resume if this is really what you want."])
415             sys.exit(1)
416
417     if resume_cache is None:
418         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
419     else:
420         writer = ArvPutCollectionWriter.from_cache(
421             resume_cache, reporter, bytes_expected)
422
423     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
424     # the originals.
425     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
426                             for sigcode in CAUGHT_SIGNALS}
427
428     if writer.bytes_written > 0:  # We're resuming a previous upload.
429         print >>stderr, "\n".join([
430                 "arv-put: Resuming previous upload from last checkpoint.",
431                 "         Use the --no-resume option to start over."])
432
433     writer.report_progress()
434     writer.do_queued_work()  # Do work resumed from cache.
435     for path in args.paths:  # Copy file data to Keep.
436         if os.path.isdir(path):
437             writer.write_directory_tree(
438                 path, max_manifest_depth=args.max_manifest_depth)
439         else:
440             writer.start_new_stream()
441             writer.write_file(path, args.filename or os.path.basename(path))
442     writer.finish_current_stream()
443
444     if args.progress:  # Print newline to split stderr from stdout for humans.
445         print >>stderr
446
447     if args.stream:
448         output = writer.manifest_text()
449     elif args.raw:
450         output = ','.join(writer.data_locators())
451     else:
452         # Register the resulting collection in Arvados.
453         collection = api_client.collections().create(
454             body={
455                 'manifest_text': writer.manifest_text(),
456                 'owner_uuid': project_link['tail_uuid']
457                 },
458             ).execute()
459
460         if 'portable_data_hash' in collection and collection['portable_data_hash']:
461             output = collection['portable_data_hash']
462         else:
463             output = collection['uuid']
464
465         if project_link is not None:
466             # Update collection name
467             try:
468                 if 'name' in collection:
469                     arvados.api().collections().update(uuid=collection['uuid'],
470                                                        body={"name": project_link["name"]}).execute()
471                 else:
472                     create_project_link(output, project_link)
473             except apiclient.errors.Error as error:
474                 print >>stderr, (
475                     "arv-put: Error adding Collection to project: {}.".format(
476                         error))
477                 status = 1
478
479     # Print the locator (uuid) of the new collection.
480     stdout.write(output)
481     if not output.endswith('\n'):
482         stdout.write('\n')
483
484     for sigcode, orig_handler in orig_signal_handlers.items():
485         signal.signal(sigcode, orig_handler)
486
487     if status != 0:
488         sys.exit(status)
489
490     if resume_cache is not None:
491         resume_cache.destroy()
492
493     return output
494
495 if __name__ == '__main__':
496     main()