7254: Do not forget -replication arg when failing to load resume state.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21 from apiclient import errors as apiclient_errors
22
23 import arvados.commands._util as arv_cmd
24
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
26 api_client = None
27
28 upload_opts = argparse.ArgumentParser(add_help=False)
29
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31                          help="""
32 Local file or directory. Default: read from standard input.
33 """)
34
35 _group = upload_opts.add_mutually_exclusive_group()
36
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38                     default=-1, help="""
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
44 """)
45
46 _group.add_argument('--normalize', action='store_true',
47                     help="""
48 Normalize the manifest by re-ordering files and streams after writing
49 data.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
55                     help="""
56 Synonym for --stream.
57 """)
58
59 _group.add_argument('--stream', action='store_true',
60                     help="""
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
63 in Arvados.
64 """)
65
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67                     help="""
68 Synonym for --manifest.
69 """)
70
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72                     help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--manifest', action='store_true',
77                     help="""
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
81 """)
82
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
84                     help="""
85 Synonym for --raw.
86 """)
87
88 _group.add_argument('--raw', action='store_true',
89                     help="""
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
92 manifest.
93 """)
94
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96                          dest='filename', help="""
97 Synonym for --filename.
98 """)
99
100 upload_opts.add_argument('--filename', type=str, default=None,
101                          help="""
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
106 """)
107
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
109                          help="""
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
112 """)
113
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
115                          help="""
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
119 """)
120
121 run_opts = argparse.ArgumentParser(add_help=False)
122
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
125 project.
126 """)
127
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
130 """)
131
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
134                     help="""
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
137 stderr is a tty.
138 """)
139
140 _group.add_argument('--no-progress', action='store_true',
141                     help="""
142 Do not display human-readable progress on stderr, even if stderr is a
143 tty.
144 """)
145
146 _group.add_argument('--batch-progress', action='store_true',
147                     help="""
148 Display machine-readable progress on stderr (bytes and, if known,
149 total data size).
150 """)
151
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
154                     help="""
155 Continue interrupted uploads from cached state (default).
156 """)
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
158                     help="""
159 Do not continue interrupted uploads from cached state.
160 """)
161
162 arg_parser = argparse.ArgumentParser(
163     description='Copy data from the local filesystem to Keep.',
164     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
165
166 def parse_arguments(arguments):
167     args = arg_parser.parse_args(arguments)
168
169     if len(args.paths) == 0:
170         args.paths = ['-']
171
172     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
173
174     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
175         if args.filename:
176             arg_parser.error("""
177     --filename argument cannot be used when storing a directory or
178     multiple files.
179     """)
180
181     # Turn on --progress by default if stderr is a tty.
182     if (not (args.batch_progress or args.no_progress)
183         and os.isatty(sys.stderr.fileno())):
184         args.progress = True
185
186     if args.paths == ['-']:
187         args.resume = False
188         if not args.filename:
189             args.filename = 'stdin'
190
191     return args
192
193 class ResumeCacheConflict(Exception):
194     pass
195
196
197 class ResumeCache(object):
198     CACHE_DIR = '.cache/arvados/arv-put'
199
200     def __init__(self, file_spec):
201         self.cache_file = open(file_spec, 'a+')
202         self._lock_file(self.cache_file)
203         self.filename = self.cache_file.name
204
205     @classmethod
206     def make_path(cls, args):
207         md5 = hashlib.md5()
208         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209         realpaths = sorted(os.path.realpath(path) for path in args.paths)
210         md5.update('\0'.join(realpaths))
211         if any(os.path.isdir(path) for path in realpaths):
212             md5.update(str(max(args.max_manifest_depth, -1)))
213         elif args.filename:
214             md5.update(args.filename)
215         return os.path.join(
216             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
217             md5.hexdigest())
218
219     def _lock_file(self, fileobj):
220         try:
221             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
222         except IOError:
223             raise ResumeCacheConflict("{} locked".format(fileobj.name))
224
225     def load(self):
226         self.cache_file.seek(0)
227         return json.load(self.cache_file)
228
229     def save(self, data):
230         try:
231             new_cache_fd, new_cache_name = tempfile.mkstemp(
232                 dir=os.path.dirname(self.filename))
233             self._lock_file(new_cache_fd)
234             new_cache = os.fdopen(new_cache_fd, 'r+')
235             json.dump(data, new_cache)
236             os.rename(new_cache_name, self.filename)
237         except (IOError, OSError, ResumeCacheConflict) as error:
238             try:
239                 os.unlink(new_cache_name)
240             except NameError:  # mkstemp failed.
241                 pass
242         else:
243             self.cache_file.close()
244             self.cache_file = new_cache
245
246     def close(self):
247         self.cache_file.close()
248
249     def destroy(self):
250         try:
251             os.unlink(self.filename)
252         except OSError as error:
253             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
254                 raise
255         self.close()
256
257     def restart(self):
258         self.destroy()
259         self.__init__(self.filename)
260
261
262 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
263     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
264                    ['bytes_written', '_seen_inputs'])
265
266     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
267         self.bytes_written = 0
268         self._seen_inputs = []
269         self.cache = cache
270         self.reporter = reporter
271         self.bytes_expected = bytes_expected
272         super(ArvPutCollectionWriter, self).__init__(**kwargs)
273
274     @classmethod
275     def from_cache(cls, cache, reporter=None, bytes_expected=None,
276                    num_retries=0, replication=0):
277         try:
278             state = cache.load()
279             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
280             writer = cls.from_state(state, cache, reporter, bytes_expected,
281                                     num_retries=num_retries,
282                                     replication=replication)
283         except (TypeError, ValueError,
284                 arvados.errors.StaleWriterStateError) as error:
285             return cls(cache, reporter, bytes_expected,
286                        num_retries=num_retries,
287                        replication=replication)
288         else:
289             return writer
290
291     def cache_state(self):
292         if self.cache is None:
293             return
294         state = self.dump_state()
295         # Transform attributes for serialization.
296         for attr, value in state.items():
297             if attr == '_data_buffer':
298                 state[attr] = base64.encodestring(''.join(value))
299             elif hasattr(value, 'popleft'):
300                 state[attr] = list(value)
301         self.cache.save(state)
302
303     def report_progress(self):
304         if self.reporter is not None:
305             self.reporter(self.bytes_written, self.bytes_expected)
306
307     def flush_data(self):
308         start_buffer_len = self._data_buffer_len
309         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
310         super(ArvPutCollectionWriter, self).flush_data()
311         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
312             self.bytes_written += (start_buffer_len - self._data_buffer_len)
313             self.report_progress()
314             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
315                 self.cache_state()
316
317     def _record_new_input(self, input_type, source_name, dest_name):
318         # The key needs to be a list because that's what we'll get back
319         # from JSON deserialization.
320         key = [input_type, source_name, dest_name]
321         if key in self._seen_inputs:
322             return False
323         self._seen_inputs.append(key)
324         return True
325
326     def write_file(self, source, filename=None):
327         if self._record_new_input('file', source, filename):
328             super(ArvPutCollectionWriter, self).write_file(source, filename)
329
330     def write_directory_tree(self,
331                              path, stream_name='.', max_manifest_depth=-1):
332         if self._record_new_input('directory', path, stream_name):
333             super(ArvPutCollectionWriter, self).write_directory_tree(
334                 path, stream_name, max_manifest_depth)
335
336
337 def expected_bytes_for(pathlist):
338     # Walk the given directory trees and stat files, adding up file sizes,
339     # so we can display progress as percent
340     bytesum = 0
341     for path in pathlist:
342         if os.path.isdir(path):
343             for filename in arvados.util.listdir_recursive(path):
344                 bytesum += os.path.getsize(os.path.join(path, filename))
345         elif not os.path.isfile(path):
346             return None
347         else:
348             bytesum += os.path.getsize(path)
349     return bytesum
350
351 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
352                                                             os.getpid())
353 def machine_progress(bytes_written, bytes_expected):
354     return _machine_format.format(
355         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
356
357 def human_progress(bytes_written, bytes_expected):
358     if bytes_expected:
359         return "\r{}M / {}M {:.1%} ".format(
360             bytes_written >> 20, bytes_expected >> 20,
361             float(bytes_written) / bytes_expected)
362     else:
363         return "\r{} ".format(bytes_written)
364
365 def progress_writer(progress_func, outfile=sys.stderr):
366     def write_progress(bytes_written, bytes_expected):
367         outfile.write(progress_func(bytes_written, bytes_expected))
368     return write_progress
369
370 def exit_signal_handler(sigcode, frame):
371     sys.exit(-sigcode)
372
373 def desired_project_uuid(api_client, project_uuid, num_retries):
374     if not project_uuid:
375         query = api_client.users().current()
376     elif arvados.util.user_uuid_pattern.match(project_uuid):
377         query = api_client.users().get(uuid=project_uuid)
378     elif arvados.util.group_uuid_pattern.match(project_uuid):
379         query = api_client.groups().get(uuid=project_uuid)
380     else:
381         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
382     return query.execute(num_retries=num_retries)['uuid']
383
384 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
385     global api_client
386
387     args = parse_arguments(arguments)
388     status = 0
389     if api_client is None:
390         api_client = arvados.api('v1')
391
392     # Determine the name to use
393     if args.name:
394         if args.stream or args.raw:
395             print >>stderr, "Cannot use --name with --stream or --raw"
396             sys.exit(1)
397         collection_name = args.name
398     else:
399         collection_name = "Saved at {} by {}@{}".format(
400             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
401             pwd.getpwuid(os.getuid()).pw_name,
402             socket.gethostname())
403
404     if args.project_uuid and (args.stream or args.raw):
405         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
406         sys.exit(1)
407
408     # Determine the parent project
409     try:
410         project_uuid = desired_project_uuid(api_client, args.project_uuid,
411                                             args.retries)
412     except (apiclient_errors.Error, ValueError) as error:
413         print >>stderr, error
414         sys.exit(1)
415
416     # write_copies diverges from args.replication here.
417     # args.replication is how many copies we will instruct Arvados to
418     # maintain (by passing it in collections().create()) after all
419     # data is written -- and if None was given, we'll use None there.
420     # Meanwhile, write_copies is how many copies of each data block we
421     # write to Keep, which has to be a number.
422     #
423     # If we simply changed args.replication from None to a default
424     # here, we'd end up erroneously passing the default replication
425     # level (instead of None) to collections().create().
426     write_copies = (args.replication or
427                     api_client._rootDesc.get('defaultCollectionReplication', 2))
428
429     if args.progress:
430         reporter = progress_writer(human_progress)
431     elif args.batch_progress:
432         reporter = progress_writer(machine_progress)
433     else:
434         reporter = None
435     bytes_expected = expected_bytes_for(args.paths)
436
437     resume_cache = None
438     if args.resume:
439         try:
440             resume_cache = ResumeCache(ResumeCache.make_path(args))
441         except (IOError, OSError, ValueError):
442             pass  # Couldn't open cache directory/file.  Continue without it.
443         except ResumeCacheConflict:
444             print >>stderr, "\n".join([
445                 "arv-put: Another process is already uploading this data.",
446                 "         Use --no-resume if this is really what you want."])
447             sys.exit(1)
448
449     if resume_cache is None:
450         writer = ArvPutCollectionWriter(
451             resume_cache, reporter, bytes_expected,
452             num_retries=args.retries,
453             replication=write_copies)
454     else:
455         writer = ArvPutCollectionWriter.from_cache(
456             resume_cache, reporter, bytes_expected,
457             num_retries=args.retries,
458             replication=write_copies)
459
460     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
461     # the originals.
462     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
463                             for sigcode in CAUGHT_SIGNALS}
464
465     if writer.bytes_written > 0:  # We're resuming a previous upload.
466         print >>stderr, "\n".join([
467                 "arv-put: Resuming previous upload from last checkpoint.",
468                 "         Use the --no-resume option to start over."])
469
470     writer.report_progress()
471     writer.do_queued_work()  # Do work resumed from cache.
472     for path in args.paths:  # Copy file data to Keep.
473         if path == '-':
474             writer.start_new_stream()
475             writer.start_new_file(args.filename)
476             r = sys.stdin.read(64*1024)
477             while r:
478                 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
479                 # CollectionWriter.write().
480                 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
481                 r = sys.stdin.read(64*1024)
482         elif os.path.isdir(path):
483             writer.write_directory_tree(
484                 path, max_manifest_depth=args.max_manifest_depth)
485         else:
486             writer.start_new_stream()
487             writer.write_file(path, args.filename or os.path.basename(path))
488     writer.finish_current_stream()
489
490     if args.progress:  # Print newline to split stderr from stdout for humans.
491         print >>stderr
492
493     if args.stream:
494         output = writer.manifest_text()
495         if args.normalize:
496             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
497     elif args.raw:
498         output = ','.join(writer.data_locators())
499     else:
500         try:
501             manifest_text = writer.manifest_text()
502             if args.normalize:
503                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
504             replication_attr = 'replication_desired'
505             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
506                 # API called it 'redundancy' before #3410.
507                 replication_attr = 'redundancy'
508             # Register the resulting collection in Arvados.
509             collection = api_client.collections().create(
510                 body={
511                     'owner_uuid': project_uuid,
512                     'name': collection_name,
513                     'manifest_text': manifest_text,
514                     replication_attr: args.replication,
515                     },
516                 ensure_unique_name=True
517                 ).execute(num_retries=args.retries)
518
519             print >>stderr, "Collection saved as '%s'" % collection['name']
520
521             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
522                 output = collection['portable_data_hash']
523             else:
524                 output = collection['uuid']
525
526         except apiclient_errors.Error as error:
527             print >>stderr, (
528                 "arv-put: Error creating Collection on project: {}.".format(
529                     error))
530             status = 1
531
532     # Print the locator (uuid) of the new collection.
533     stdout.write(output)
534     if not output.endswith('\n'):
535         stdout.write('\n')
536
537     for sigcode, orig_handler in orig_signal_handlers.items():
538         signal.signal(sigcode, orig_handler)
539
540     if status != 0:
541         sys.exit(status)
542
543     if resume_cache is not None:
544         resume_cache.destroy()
545
546     return output
547
548 if __name__ == '__main__':
549     main()