3699: use /scratch instead of /tmp for git copies
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import apiclient.errors
7 import argparse
8 import arvados
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21
22 import arvados.commands._util as arv_cmd
23
24 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
25 api_client = None
26
27 upload_opts = argparse.ArgumentParser(add_help=False)
28
29 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
30                     help="""
31 Local file or directory. Default: read from standard input.
32 """)
33
34 upload_opts.add_argument('--max-manifest-depth', type=int, metavar='N',
35                     default=-1, help="""
36 Maximum depth of directory tree to represent in the manifest
37 structure. A directory structure deeper than this will be represented
38 as a single stream in the manifest. If N=0, the manifest will contain
39 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
40 stream per filesystem directory that contains files.
41 """)
42
43 _group = upload_opts.add_mutually_exclusive_group()
44
45 _group.add_argument('--as-stream', action='store_true', dest='stream',
46                    help="""
47 Synonym for --stream.
48 """)
49
50 _group.add_argument('--stream', action='store_true',
51                    help="""
52 Store the file content and display the resulting manifest on
53 stdout. Do not write the manifest to Keep or save a Collection object
54 in Arvados.
55 """)
56
57 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
58                    help="""
59 Synonym for --manifest.
60 """)
61
62 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
63                    help="""
64 Synonym for --manifest.
65 """)
66
67 _group.add_argument('--manifest', action='store_true',
68                    help="""
69 Store the file data and resulting manifest in Keep, save a Collection
70 object in Arvados, and display the manifest locator (Collection uuid)
71 on stdout. This is the default behavior.
72 """)
73
74 _group.add_argument('--as-raw', action='store_true', dest='raw',
75                    help="""
76 Synonym for --raw.
77 """)
78
79 _group.add_argument('--raw', action='store_true',
80                    help="""
81 Store the file content and display the data block locators on stdout,
82 separated by commas, with a trailing newline. Do not store a
83 manifest.
84 """)
85
86 upload_opts.add_argument('--use-filename', type=str, default=None,
87                     dest='filename', help="""
88 Synonym for --filename.
89 """)
90
91 upload_opts.add_argument('--filename', type=str, default=None,
92                     help="""
93 Use the given filename in the manifest, instead of the name of the
94 local file. This is useful when "-" or "/dev/stdin" is given as an
95 input file. It can be used only if there is exactly one path given and
96 it is not a directory. Implies --manifest.
97 """)
98
99 upload_opts.add_argument('--portable-data-hash', action='store_true',
100                     help="""
101 Print the portable data hash instead of the Arvados UUID for the collection
102 created by the upload.
103 """)
104
105 run_opts = argparse.ArgumentParser(add_help=False)
106
107 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
108 Store the collection in the specified project, instead of your Home
109 project.
110 """)
111
112 run_opts.add_argument('--name', help="""
113 Save the collection with the specified name.
114 """)
115
116 _group = run_opts.add_mutually_exclusive_group()
117 _group.add_argument('--progress', action='store_true',
118                    help="""
119 Display human-readable progress on stderr (bytes and, if possible,
120 percentage of total data size). This is the default behavior when
121 stderr is a tty.
122 """)
123
124 _group.add_argument('--no-progress', action='store_true',
125                    help="""
126 Do not display human-readable progress on stderr, even if stderr is a
127 tty.
128 """)
129
130 _group.add_argument('--batch-progress', action='store_true',
131                    help="""
132 Display machine-readable progress on stderr (bytes and, if known,
133 total data size).
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--resume', action='store_true', default=True,
138                    help="""
139 Continue interrupted uploads from cached state (default).
140 """)
141 _group.add_argument('--no-resume', action='store_false', dest='resume',
142                    help="""
143 Do not continue interrupted uploads from cached state.
144 """)
145
146 arg_parser = argparse.ArgumentParser(
147     description='Copy data from the local filesystem to Keep.',
148     parents=[upload_opts, run_opts])
149
150 def parse_arguments(arguments):
151     args = arg_parser.parse_args(arguments)
152
153     if len(args.paths) == 0:
154         args.paths += ['/dev/stdin']
155
156     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
157         if args.filename:
158             arg_parser.error("""
159     --filename argument cannot be used when storing a directory or
160     multiple files.
161     """)
162
163     # Turn on --progress by default if stderr is a tty.
164     if (not (args.batch_progress or args.no_progress)
165         and os.isatty(sys.stderr.fileno())):
166         args.progress = True
167
168     if args.paths == ['-']:
169         args.paths = ['/dev/stdin']
170         if not args.filename:
171             args.filename = '-'
172
173     return args
174
175 class ResumeCacheConflict(Exception):
176     pass
177
178
179 class ResumeCache(object):
180     CACHE_DIR = '.cache/arvados/arv-put'
181
182     def __init__(self, file_spec):
183         self.cache_file = open(file_spec, 'a+')
184         self._lock_file(self.cache_file)
185         self.filename = self.cache_file.name
186
187     @classmethod
188     def make_path(cls, args):
189         md5 = hashlib.md5()
190         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
191         realpaths = sorted(os.path.realpath(path) for path in args.paths)
192         md5.update('\0'.join(realpaths))
193         if any(os.path.isdir(path) for path in realpaths):
194             md5.update(str(max(args.max_manifest_depth, -1)))
195         elif args.filename:
196             md5.update(args.filename)
197         return os.path.join(
198             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
199             md5.hexdigest())
200
201     def _lock_file(self, fileobj):
202         try:
203             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
204         except IOError:
205             raise ResumeCacheConflict("{} locked".format(fileobj.name))
206
207     def load(self):
208         self.cache_file.seek(0)
209         return json.load(self.cache_file)
210
211     def save(self, data):
212         try:
213             new_cache_fd, new_cache_name = tempfile.mkstemp(
214                 dir=os.path.dirname(self.filename))
215             self._lock_file(new_cache_fd)
216             new_cache = os.fdopen(new_cache_fd, 'r+')
217             json.dump(data, new_cache)
218             os.rename(new_cache_name, self.filename)
219         except (IOError, OSError, ResumeCacheConflict) as error:
220             try:
221                 os.unlink(new_cache_name)
222             except NameError:  # mkstemp failed.
223                 pass
224         else:
225             self.cache_file.close()
226             self.cache_file = new_cache
227
228     def close(self):
229         self.cache_file.close()
230
231     def destroy(self):
232         try:
233             os.unlink(self.filename)
234         except OSError as error:
235             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
236                 raise
237         self.close()
238
239     def restart(self):
240         self.destroy()
241         self.__init__(self.filename)
242
243
244 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
245     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
246                    ['bytes_written', '_seen_inputs'])
247
248     def __init__(self, cache=None, reporter=None, bytes_expected=None,
249                  api_client=None):
250         self.bytes_written = 0
251         self._seen_inputs = []
252         self.cache = cache
253         self.reporter = reporter
254         self.bytes_expected = bytes_expected
255         super(ArvPutCollectionWriter, self).__init__(api_client)
256
257     @classmethod
258     def from_cache(cls, cache, reporter=None, bytes_expected=None):
259         try:
260             state = cache.load()
261             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
262             writer = cls.from_state(state, cache, reporter, bytes_expected)
263         except (TypeError, ValueError,
264                 arvados.errors.StaleWriterStateError) as error:
265             return cls(cache, reporter, bytes_expected)
266         else:
267             return writer
268
269     def cache_state(self):
270         if self.cache is None:
271             return
272         state = self.dump_state()
273         # Transform attributes for serialization.
274         for attr, value in state.items():
275             if attr == '_data_buffer':
276                 state[attr] = base64.encodestring(''.join(value))
277             elif hasattr(value, 'popleft'):
278                 state[attr] = list(value)
279         self.cache.save(state)
280
281     def report_progress(self):
282         if self.reporter is not None:
283             self.reporter(self.bytes_written, self.bytes_expected)
284
285     def flush_data(self):
286         start_buffer_len = self._data_buffer_len
287         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
288         super(ArvPutCollectionWriter, self).flush_data()
289         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
290             self.bytes_written += (start_buffer_len - self._data_buffer_len)
291             self.report_progress()
292             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
293                 self.cache_state()
294
295     def _record_new_input(self, input_type, source_name, dest_name):
296         # The key needs to be a list because that's what we'll get back
297         # from JSON deserialization.
298         key = [input_type, source_name, dest_name]
299         if key in self._seen_inputs:
300             return False
301         self._seen_inputs.append(key)
302         return True
303
304     def write_file(self, source, filename=None):
305         if self._record_new_input('file', source, filename):
306             super(ArvPutCollectionWriter, self).write_file(source, filename)
307
308     def write_directory_tree(self,
309                              path, stream_name='.', max_manifest_depth=-1):
310         if self._record_new_input('directory', path, stream_name):
311             super(ArvPutCollectionWriter, self).write_directory_tree(
312                 path, stream_name, max_manifest_depth)
313
314
315 def expected_bytes_for(pathlist):
316     # Walk the given directory trees and stat files, adding up file sizes,
317     # so we can display progress as percent
318     bytesum = 0
319     for path in pathlist:
320         if os.path.isdir(path):
321             for filename in arvados.util.listdir_recursive(path):
322                 bytesum += os.path.getsize(os.path.join(path, filename))
323         elif not os.path.isfile(path):
324             return None
325         else:
326             bytesum += os.path.getsize(path)
327     return bytesum
328
329 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
330                                                             os.getpid())
331 def machine_progress(bytes_written, bytes_expected):
332     return _machine_format.format(
333         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
334
335 def human_progress(bytes_written, bytes_expected):
336     if bytes_expected:
337         return "\r{}M / {}M {:.1%} ".format(
338             bytes_written >> 20, bytes_expected >> 20,
339             float(bytes_written) / bytes_expected)
340     else:
341         return "\r{} ".format(bytes_written)
342
343 def progress_writer(progress_func, outfile=sys.stderr):
344     def write_progress(bytes_written, bytes_expected):
345         outfile.write(progress_func(bytes_written, bytes_expected))
346     return write_progress
347
348 def exit_signal_handler(sigcode, frame):
349     sys.exit(-sigcode)
350
351 def desired_project_uuid(api_client, project_uuid):
352     if project_uuid:
353         if arvados.util.user_uuid_pattern.match(project_uuid):
354             api_client.users().get(uuid=project_uuid).execute()
355             return project_uuid
356         elif arvados.util.group_uuid_pattern.match(project_uuid):
357             api_client.groups().get(uuid=project_uuid).execute()
358             return project_uuid
359         else:
360             raise ValueError("Not a valid project uuid: {}".format(project_uuid))
361     else:
362         return api_client.users().current().execute()['uuid']
363
364 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
365     global api_client
366     if api_client is None:
367         api_client = arvados.api('v1')
368     status = 0
369
370     args = parse_arguments(arguments)
371
372     # Determine the name to use
373     if args.name:
374         if args.stream or args.raw:
375             print >>stderr, "Cannot use --name with --stream or --raw"
376             sys.exit(1)
377         collection_name = args.name
378     else:
379         collection_name = "Saved at {} by {}@{}".format(
380             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
381             pwd.getpwuid(os.getuid()).pw_name,
382             socket.gethostname())
383
384     if args.project_uuid and (args.stream or args.raw):
385         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
386         sys.exit(1)
387
388     # Determine the parent project
389     try:
390         project_uuid = desired_project_uuid(api_client, args.project_uuid)
391     except (apiclient.errors.Error, ValueError) as error:
392         print >>stderr, error
393         sys.exit(1)
394
395     if args.progress:
396         reporter = progress_writer(human_progress)
397     elif args.batch_progress:
398         reporter = progress_writer(machine_progress)
399     else:
400         reporter = None
401     bytes_expected = expected_bytes_for(args.paths)
402
403     resume_cache = None
404     if args.resume:
405         try:
406             resume_cache = ResumeCache(ResumeCache.make_path(args))
407         except (IOError, OSError, ValueError):
408             pass  # Couldn't open cache directory/file.  Continue without it.
409         except ResumeCacheConflict:
410             print >>stderr, "\n".join([
411                 "arv-put: Another process is already uploading this data.",
412                 "         Use --no-resume if this is really what you want."])
413             sys.exit(1)
414
415     if resume_cache is None:
416         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
417     else:
418         writer = ArvPutCollectionWriter.from_cache(
419             resume_cache, reporter, bytes_expected)
420
421     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
422     # the originals.
423     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
424                             for sigcode in CAUGHT_SIGNALS}
425
426     if writer.bytes_written > 0:  # We're resuming a previous upload.
427         print >>stderr, "\n".join([
428                 "arv-put: Resuming previous upload from last checkpoint.",
429                 "         Use the --no-resume option to start over."])
430
431     writer.report_progress()
432     writer.do_queued_work()  # Do work resumed from cache.
433     for path in args.paths:  # Copy file data to Keep.
434         if os.path.isdir(path):
435             writer.write_directory_tree(
436                 path, max_manifest_depth=args.max_manifest_depth)
437         else:
438             writer.start_new_stream()
439             writer.write_file(path, args.filename or os.path.basename(path))
440     writer.finish_current_stream()
441
442     if args.progress:  # Print newline to split stderr from stdout for humans.
443         print >>stderr
444
445     if args.stream:
446         output = writer.manifest_text()
447     elif args.raw:
448         output = ','.join(writer.data_locators())
449     else:
450         try:
451             # Register the resulting collection in Arvados.
452             collection = api_client.collections().create(
453                 body={
454                     'owner_uuid': project_uuid,
455                     'name': collection_name,
456                     'manifest_text': writer.manifest_text()
457                     },
458                 ensure_unique_name=True
459                 ).execute()
460
461             print >>stderr, "Collection saved as '%s'" % collection['name']
462
463             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
464                 output = collection['portable_data_hash']
465             else:
466                 output = collection['uuid']
467
468         except apiclient.errors.Error as error:
469             print >>stderr, (
470                 "arv-put: Error creating Collection on project: {}.".format(
471                     error))
472             status = 1
473
474     # Print the locator (uuid) of the new collection.
475     stdout.write(output)
476     if not output.endswith('\n'):
477         stdout.write('\n')
478
479     for sigcode, orig_handler in orig_signal_handlers.items():
480         signal.signal(sigcode, orig_handler)
481
482     if status != 0:
483         sys.exit(status)
484
485     if resume_cache is not None:
486         resume_cache.destroy()
487
488     return output
489
490 if __name__ == '__main__':
491     main()