7225: ArvadosFile.flush() commits all underlying blocks.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21 from apiclient import errors as apiclient_errors
22
23 import arvados.commands._util as arv_cmd
24
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
26 api_client = None
27
28 upload_opts = argparse.ArgumentParser(add_help=False)
29
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31                          help="""
32 Local file or directory. Default: read from standard input.
33 """)
34
35 _group = upload_opts.add_mutually_exclusive_group()
36
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38                     default=-1, help="""
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
44 """)
45
46 _group.add_argument('--normalize', action='store_true',
47                     help="""
48 Normalize the manifest by re-ordering files and streams after writing
49 data.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
55                     help="""
56 Synonym for --stream.
57 """)
58
59 _group.add_argument('--stream', action='store_true',
60                     help="""
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
63 in Arvados.
64 """)
65
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67                     help="""
68 Synonym for --manifest.
69 """)
70
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72                     help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--manifest', action='store_true',
77                     help="""
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
81 """)
82
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
84                     help="""
85 Synonym for --raw.
86 """)
87
88 _group.add_argument('--raw', action='store_true',
89                     help="""
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
92 manifest.
93 """)
94
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96                          dest='filename', help="""
97 Synonym for --filename.
98 """)
99
100 upload_opts.add_argument('--filename', type=str, default=None,
101                          help="""
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
106 """)
107
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
109                          help="""
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
112 """)
113
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
115                          help="""
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
119 """)
120
121 run_opts = argparse.ArgumentParser(add_help=False)
122
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
125 project.
126 """)
127
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
130 """)
131
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
134                     help="""
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
137 stderr is a tty.
138 """)
139
140 _group.add_argument('--no-progress', action='store_true',
141                     help="""
142 Do not display human-readable progress on stderr, even if stderr is a
143 tty.
144 """)
145
146 _group.add_argument('--batch-progress', action='store_true',
147                     help="""
148 Display machine-readable progress on stderr (bytes and, if known,
149 total data size).
150 """)
151
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
154                     help="""
155 Continue interrupted uploads from cached state (default).
156 """)
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
158                     help="""
159 Do not continue interrupted uploads from cached state.
160 """)
161
162 arg_parser = argparse.ArgumentParser(
163     description='Copy data from the local filesystem to Keep.',
164     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
165
166 def parse_arguments(arguments):
167     args = arg_parser.parse_args(arguments)
168
169     if len(args.paths) == 0:
170         args.paths = ['-']
171
172     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
173
174     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
175         if args.filename:
176             arg_parser.error("""
177     --filename argument cannot be used when storing a directory or
178     multiple files.
179     """)
180
181     # Turn on --progress by default if stderr is a tty.
182     if (not (args.batch_progress or args.no_progress)
183         and os.isatty(sys.stderr.fileno())):
184         args.progress = True
185
186     if args.paths == ['-']:
187         args.resume = False
188         if not args.filename:
189             args.filename = 'stdin'
190
191     return args
192
193 class ResumeCacheConflict(Exception):
194     pass
195
196
197 class ResumeCache(object):
198     CACHE_DIR = '.cache/arvados/arv-put'
199
200     def __init__(self, file_spec):
201         self.cache_file = open(file_spec, 'a+')
202         self._lock_file(self.cache_file)
203         self.filename = self.cache_file.name
204
205     @classmethod
206     def make_path(cls, args):
207         md5 = hashlib.md5()
208         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209         realpaths = sorted(os.path.realpath(path) for path in args.paths)
210         md5.update('\0'.join(realpaths))
211         if any(os.path.isdir(path) for path in realpaths):
212             md5.update(str(max(args.max_manifest_depth, -1)))
213         elif args.filename:
214             md5.update(args.filename)
215         return os.path.join(
216             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
217             md5.hexdigest())
218
219     def _lock_file(self, fileobj):
220         try:
221             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
222         except IOError:
223             raise ResumeCacheConflict("{} locked".format(fileobj.name))
224
225     def load(self):
226         self.cache_file.seek(0)
227         return json.load(self.cache_file)
228
229     def save(self, data):
230         try:
231             new_cache_fd, new_cache_name = tempfile.mkstemp(
232                 dir=os.path.dirname(self.filename))
233             self._lock_file(new_cache_fd)
234             new_cache = os.fdopen(new_cache_fd, 'r+')
235             json.dump(data, new_cache)
236             os.rename(new_cache_name, self.filename)
237         except (IOError, OSError, ResumeCacheConflict) as error:
238             try:
239                 os.unlink(new_cache_name)
240             except NameError:  # mkstemp failed.
241                 pass
242         else:
243             self.cache_file.close()
244             self.cache_file = new_cache
245
246     def close(self):
247         self.cache_file.close()
248
249     def destroy(self):
250         try:
251             os.unlink(self.filename)
252         except OSError as error:
253             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
254                 raise
255         self.close()
256
257     def restart(self):
258         self.destroy()
259         self.__init__(self.filename)
260
261
262 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
263     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
264                    ['bytes_written', '_seen_inputs'])
265
266     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
267         self.bytes_written = 0
268         self._seen_inputs = []
269         self.cache = cache
270         self.reporter = reporter
271         self.bytes_expected = bytes_expected
272         super(ArvPutCollectionWriter, self).__init__(**kwargs)
273
274     @classmethod
275     def from_cache(cls, cache, reporter=None, bytes_expected=None,
276                    num_retries=0, replication=0):
277         try:
278             state = cache.load()
279             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
280             writer = cls.from_state(state, cache, reporter, bytes_expected,
281                                     num_retries=num_retries,
282                                     replication=replication)
283         except (TypeError, ValueError,
284                 arvados.errors.StaleWriterStateError) as error:
285             return cls(cache, reporter, bytes_expected, num_retries=num_retries)
286         else:
287             return writer
288
289     def cache_state(self):
290         if self.cache is None:
291             return
292         state = self.dump_state()
293         # Transform attributes for serialization.
294         for attr, value in state.items():
295             if attr == '_data_buffer':
296                 state[attr] = base64.encodestring(''.join(value))
297             elif hasattr(value, 'popleft'):
298                 state[attr] = list(value)
299         self.cache.save(state)
300
301     def report_progress(self):
302         if self.reporter is not None:
303             self.reporter(self.bytes_written, self.bytes_expected)
304
305     def flush_data(self):
306         start_buffer_len = self._data_buffer_len
307         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
308         super(ArvPutCollectionWriter, self).flush_data()
309         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
310             self.bytes_written += (start_buffer_len - self._data_buffer_len)
311             self.report_progress()
312             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
313                 self.cache_state()
314
315     def _record_new_input(self, input_type, source_name, dest_name):
316         # The key needs to be a list because that's what we'll get back
317         # from JSON deserialization.
318         key = [input_type, source_name, dest_name]
319         if key in self._seen_inputs:
320             return False
321         self._seen_inputs.append(key)
322         return True
323
324     def write_file(self, source, filename=None):
325         if self._record_new_input('file', source, filename):
326             super(ArvPutCollectionWriter, self).write_file(source, filename)
327
328     def write_directory_tree(self,
329                              path, stream_name='.', max_manifest_depth=-1):
330         if self._record_new_input('directory', path, stream_name):
331             super(ArvPutCollectionWriter, self).write_directory_tree(
332                 path, stream_name, max_manifest_depth)
333
334
335 def expected_bytes_for(pathlist):
336     # Walk the given directory trees and stat files, adding up file sizes,
337     # so we can display progress as percent
338     bytesum = 0
339     for path in pathlist:
340         if os.path.isdir(path):
341             for filename in arvados.util.listdir_recursive(path):
342                 bytesum += os.path.getsize(os.path.join(path, filename))
343         elif not os.path.isfile(path):
344             return None
345         else:
346             bytesum += os.path.getsize(path)
347     return bytesum
348
349 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
350                                                             os.getpid())
351 def machine_progress(bytes_written, bytes_expected):
352     return _machine_format.format(
353         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
354
355 def human_progress(bytes_written, bytes_expected):
356     if bytes_expected:
357         return "\r{}M / {}M {:.1%} ".format(
358             bytes_written >> 20, bytes_expected >> 20,
359             float(bytes_written) / bytes_expected)
360     else:
361         return "\r{} ".format(bytes_written)
362
363 def progress_writer(progress_func, outfile=sys.stderr):
364     def write_progress(bytes_written, bytes_expected):
365         outfile.write(progress_func(bytes_written, bytes_expected))
366     return write_progress
367
368 def exit_signal_handler(sigcode, frame):
369     sys.exit(-sigcode)
370
371 def desired_project_uuid(api_client, project_uuid, num_retries):
372     if not project_uuid:
373         query = api_client.users().current()
374     elif arvados.util.user_uuid_pattern.match(project_uuid):
375         query = api_client.users().get(uuid=project_uuid)
376     elif arvados.util.group_uuid_pattern.match(project_uuid):
377         query = api_client.groups().get(uuid=project_uuid)
378     else:
379         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
380     return query.execute(num_retries=num_retries)['uuid']
381
382 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
383     global api_client
384
385     args = parse_arguments(arguments)
386     status = 0
387     if api_client is None:
388         api_client = arvados.api('v1')
389
390     # Determine the name to use
391     if args.name:
392         if args.stream or args.raw:
393             print >>stderr, "Cannot use --name with --stream or --raw"
394             sys.exit(1)
395         collection_name = args.name
396     else:
397         collection_name = "Saved at {} by {}@{}".format(
398             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
399             pwd.getpwuid(os.getuid()).pw_name,
400             socket.gethostname())
401
402     if args.project_uuid and (args.stream or args.raw):
403         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
404         sys.exit(1)
405
406     # Determine the parent project
407     try:
408         project_uuid = desired_project_uuid(api_client, args.project_uuid,
409                                             args.retries)
410     except (apiclient_errors.Error, ValueError) as error:
411         print >>stderr, error
412         sys.exit(1)
413
414     # write_copies diverges from args.replication here.
415     # args.replication is how many copies we will instruct Arvados to
416     # maintain (by passing it in collections().create()) after all
417     # data is written -- and if None was given, we'll use None there.
418     # Meanwhile, write_copies is how many copies of each data block we
419     # write to Keep, which has to be a number.
420     #
421     # If we simply changed args.replication from None to a default
422     # here, we'd end up erroneously passing the default replication
423     # level (instead of None) to collections().create().
424     write_copies = (args.replication or
425                     api_client._rootDesc.get('defaultCollectionReplication', 2))
426
427     if args.progress:
428         reporter = progress_writer(human_progress)
429     elif args.batch_progress:
430         reporter = progress_writer(machine_progress)
431     else:
432         reporter = None
433     bytes_expected = expected_bytes_for(args.paths)
434
435     resume_cache = None
436     if args.resume:
437         try:
438             resume_cache = ResumeCache(ResumeCache.make_path(args))
439         except (IOError, OSError, ValueError):
440             pass  # Couldn't open cache directory/file.  Continue without it.
441         except ResumeCacheConflict:
442             print >>stderr, "\n".join([
443                 "arv-put: Another process is already uploading this data.",
444                 "         Use --no-resume if this is really what you want."])
445             sys.exit(1)
446
447     if resume_cache is None:
448         writer = ArvPutCollectionWriter(
449             resume_cache, reporter, bytes_expected,
450             num_retries=args.retries,
451             replication=write_copies)
452     else:
453         writer = ArvPutCollectionWriter.from_cache(
454             resume_cache, reporter, bytes_expected,
455             num_retries=args.retries,
456             replication=write_copies)
457
458     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
459     # the originals.
460     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
461                             for sigcode in CAUGHT_SIGNALS}
462
463     if writer.bytes_written > 0:  # We're resuming a previous upload.
464         print >>stderr, "\n".join([
465                 "arv-put: Resuming previous upload from last checkpoint.",
466                 "         Use the --no-resume option to start over."])
467
468     writer.report_progress()
469     writer.do_queued_work()  # Do work resumed from cache.
470     for path in args.paths:  # Copy file data to Keep.
471         if path == '-':
472             writer.start_new_stream()
473             writer.start_new_file(args.filename)
474             r = sys.stdin.read(64*1024)
475             while r:
476                 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
477                 # CollectionWriter.write().
478                 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
479                 r = sys.stdin.read(64*1024)
480         elif os.path.isdir(path):
481             writer.write_directory_tree(
482                 path, max_manifest_depth=args.max_manifest_depth)
483         else:
484             writer.start_new_stream()
485             writer.write_file(path, args.filename or os.path.basename(path))
486     writer.finish_current_stream()
487
488     if args.progress:  # Print newline to split stderr from stdout for humans.
489         print >>stderr
490
491     if args.stream:
492         output = writer.manifest_text()
493         if args.normalize:
494             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
495     elif args.raw:
496         output = ','.join(writer.data_locators())
497     else:
498         try:
499             manifest_text = writer.manifest_text()
500             if args.normalize:
501                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
502             replication_attr = 'replication_desired'
503             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
504                 # API called it 'redundancy' before #3410.
505                 replication_attr = 'redundancy'
506             # Register the resulting collection in Arvados.
507             collection = api_client.collections().create(
508                 body={
509                     'owner_uuid': project_uuid,
510                     'name': collection_name,
511                     'manifest_text': manifest_text,
512                     replication_attr: args.replication,
513                     },
514                 ensure_unique_name=True
515                 ).execute(num_retries=args.retries)
516
517             print >>stderr, "Collection saved as '%s'" % collection['name']
518
519             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
520                 output = collection['portable_data_hash']
521             else:
522                 output = collection['uuid']
523
524         except apiclient_errors.Error as error:
525             print >>stderr, (
526                 "arv-put: Error creating Collection on project: {}.".format(
527                     error))
528             status = 1
529
530     # Print the locator (uuid) of the new collection.
531     stdout.write(output)
532     if not output.endswith('\n'):
533         stdout.write('\n')
534
535     for sigcode, orig_handler in orig_signal_handlers.items():
536         signal.signal(sigcode, orig_handler)
537
538     if status != 0:
539         sys.exit(status)
540
541     if resume_cache is not None:
542         resume_cache.destroy()
543
544     return output
545
546 if __name__ == '__main__':
547     main()