Merge branch 'master' into 6465-optimize-workbench-integration-tests
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21 from apiclient import errors as apiclient_errors
22
23 import arvados.commands._util as arv_cmd
24
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
26 api_client = None
27
28 upload_opts = argparse.ArgumentParser(add_help=False)
29
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31                          help="""
32 Local file or directory. Default: read from standard input.
33 """)
34
35 _group = upload_opts.add_mutually_exclusive_group()
36
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38                     default=-1, help="""
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
44 """)
45
46 _group.add_argument('--normalize', action='store_true',
47                     help="""
48 Normalize the manifest by re-ordering files and streams after writing
49 data.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
55                     help="""
56 Synonym for --stream.
57 """)
58
59 _group.add_argument('--stream', action='store_true',
60                     help="""
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
63 in Arvados.
64 """)
65
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67                     help="""
68 Synonym for --manifest.
69 """)
70
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72                     help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--manifest', action='store_true',
77                     help="""
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
81 """)
82
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
84                     help="""
85 Synonym for --raw.
86 """)
87
88 _group.add_argument('--raw', action='store_true',
89                     help="""
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
92 manifest.
93 """)
94
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96                          dest='filename', help="""
97 Synonym for --filename.
98 """)
99
100 upload_opts.add_argument('--filename', type=str, default=None,
101                          help="""
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
106 """)
107
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
109                          help="""
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
112 """)
113
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
115                          help="""
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
119 """)
120
121 run_opts = argparse.ArgumentParser(add_help=False)
122
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
125 project.
126 """)
127
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
130 """)
131
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
134                     help="""
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
137 stderr is a tty.
138 """)
139
140 _group.add_argument('--no-progress', action='store_true',
141                     help="""
142 Do not display human-readable progress on stderr, even if stderr is a
143 tty.
144 """)
145
146 _group.add_argument('--batch-progress', action='store_true',
147                     help="""
148 Display machine-readable progress on stderr (bytes and, if known,
149 total data size).
150 """)
151
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
154                     help="""
155 Continue interrupted uploads from cached state (default).
156 """)
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
158                     help="""
159 Do not continue interrupted uploads from cached state.
160 """)
161
162 arg_parser = argparse.ArgumentParser(
163     description='Copy data from the local filesystem to Keep.',
164     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
165
166 def parse_arguments(arguments):
167     args = arg_parser.parse_args(arguments)
168
169     if len(args.paths) == 0:
170         args.paths += ['/dev/stdin']
171
172     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
173         if args.filename:
174             arg_parser.error("""
175     --filename argument cannot be used when storing a directory or
176     multiple files.
177     """)
178
179     # Turn on --progress by default if stderr is a tty.
180     if (not (args.batch_progress or args.no_progress)
181         and os.isatty(sys.stderr.fileno())):
182         args.progress = True
183
184     if args.paths == ['-']:
185         args.paths = ['/dev/stdin']
186         if not args.filename:
187             args.filename = '-'
188
189     return args
190
191 class ResumeCacheConflict(Exception):
192     pass
193
194
195 class ResumeCache(object):
196     CACHE_DIR = '.cache/arvados/arv-put'
197
198     def __init__(self, file_spec):
199         self.cache_file = open(file_spec, 'a+')
200         self._lock_file(self.cache_file)
201         self.filename = self.cache_file.name
202
203     @classmethod
204     def make_path(cls, args):
205         md5 = hashlib.md5()
206         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
207         realpaths = sorted(os.path.realpath(path) for path in args.paths)
208         md5.update('\0'.join(realpaths))
209         if any(os.path.isdir(path) for path in realpaths):
210             md5.update(str(max(args.max_manifest_depth, -1)))
211         elif args.filename:
212             md5.update(args.filename)
213         return os.path.join(
214             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
215             md5.hexdigest())
216
217     def _lock_file(self, fileobj):
218         try:
219             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
220         except IOError:
221             raise ResumeCacheConflict("{} locked".format(fileobj.name))
222
223     def load(self):
224         self.cache_file.seek(0)
225         return json.load(self.cache_file)
226
227     def save(self, data):
228         try:
229             new_cache_fd, new_cache_name = tempfile.mkstemp(
230                 dir=os.path.dirname(self.filename))
231             self._lock_file(new_cache_fd)
232             new_cache = os.fdopen(new_cache_fd, 'r+')
233             json.dump(data, new_cache)
234             os.rename(new_cache_name, self.filename)
235         except (IOError, OSError, ResumeCacheConflict) as error:
236             try:
237                 os.unlink(new_cache_name)
238             except NameError:  # mkstemp failed.
239                 pass
240         else:
241             self.cache_file.close()
242             self.cache_file = new_cache
243
244     def close(self):
245         self.cache_file.close()
246
247     def destroy(self):
248         try:
249             os.unlink(self.filename)
250         except OSError as error:
251             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
252                 raise
253         self.close()
254
255     def restart(self):
256         self.destroy()
257         self.__init__(self.filename)
258
259
260 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
261     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
262                    ['bytes_written', '_seen_inputs'])
263
264     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
265         self.bytes_written = 0
266         self._seen_inputs = []
267         self.cache = cache
268         self.reporter = reporter
269         self.bytes_expected = bytes_expected
270         super(ArvPutCollectionWriter, self).__init__(**kwargs)
271
272     @classmethod
273     def from_cache(cls, cache, reporter=None, bytes_expected=None,
274                    num_retries=0, replication=0):
275         try:
276             state = cache.load()
277             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
278             writer = cls.from_state(state, cache, reporter, bytes_expected,
279                                     num_retries=num_retries,
280                                     replication=replication)
281         except (TypeError, ValueError,
282                 arvados.errors.StaleWriterStateError) as error:
283             return cls(cache, reporter, bytes_expected, num_retries=num_retries)
284         else:
285             return writer
286
287     def cache_state(self):
288         if self.cache is None:
289             return
290         state = self.dump_state()
291         # Transform attributes for serialization.
292         for attr, value in state.items():
293             if attr == '_data_buffer':
294                 state[attr] = base64.encodestring(''.join(value))
295             elif hasattr(value, 'popleft'):
296                 state[attr] = list(value)
297         self.cache.save(state)
298
299     def report_progress(self):
300         if self.reporter is not None:
301             self.reporter(self.bytes_written, self.bytes_expected)
302
303     def flush_data(self):
304         start_buffer_len = self._data_buffer_len
305         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
306         super(ArvPutCollectionWriter, self).flush_data()
307         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
308             self.bytes_written += (start_buffer_len - self._data_buffer_len)
309             self.report_progress()
310             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
311                 self.cache_state()
312
313     def _record_new_input(self, input_type, source_name, dest_name):
314         # The key needs to be a list because that's what we'll get back
315         # from JSON deserialization.
316         key = [input_type, source_name, dest_name]
317         if key in self._seen_inputs:
318             return False
319         self._seen_inputs.append(key)
320         return True
321
322     def write_file(self, source, filename=None):
323         if self._record_new_input('file', source, filename):
324             super(ArvPutCollectionWriter, self).write_file(source, filename)
325
326     def write_directory_tree(self,
327                              path, stream_name='.', max_manifest_depth=-1):
328         if self._record_new_input('directory', path, stream_name):
329             super(ArvPutCollectionWriter, self).write_directory_tree(
330                 path, stream_name, max_manifest_depth)
331
332
333 def expected_bytes_for(pathlist):
334     # Walk the given directory trees and stat files, adding up file sizes,
335     # so we can display progress as percent
336     bytesum = 0
337     for path in pathlist:
338         if os.path.isdir(path):
339             for filename in arvados.util.listdir_recursive(path):
340                 bytesum += os.path.getsize(os.path.join(path, filename))
341         elif not os.path.isfile(path):
342             return None
343         else:
344             bytesum += os.path.getsize(path)
345     return bytesum
346
347 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
348                                                             os.getpid())
349 def machine_progress(bytes_written, bytes_expected):
350     return _machine_format.format(
351         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
352
353 def human_progress(bytes_written, bytes_expected):
354     if bytes_expected:
355         return "\r{}M / {}M {:.1%} ".format(
356             bytes_written >> 20, bytes_expected >> 20,
357             float(bytes_written) / bytes_expected)
358     else:
359         return "\r{} ".format(bytes_written)
360
361 def progress_writer(progress_func, outfile=sys.stderr):
362     def write_progress(bytes_written, bytes_expected):
363         outfile.write(progress_func(bytes_written, bytes_expected))
364     return write_progress
365
366 def exit_signal_handler(sigcode, frame):
367     sys.exit(-sigcode)
368
369 def desired_project_uuid(api_client, project_uuid, num_retries):
370     if not project_uuid:
371         query = api_client.users().current()
372     elif arvados.util.user_uuid_pattern.match(project_uuid):
373         query = api_client.users().get(uuid=project_uuid)
374     elif arvados.util.group_uuid_pattern.match(project_uuid):
375         query = api_client.groups().get(uuid=project_uuid)
376     else:
377         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
378     return query.execute(num_retries=num_retries)['uuid']
379
380 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
381     global api_client
382
383     args = parse_arguments(arguments)
384     status = 0
385     if api_client is None:
386         api_client = arvados.api('v1')
387
388     # Determine the name to use
389     if args.name:
390         if args.stream or args.raw:
391             print >>stderr, "Cannot use --name with --stream or --raw"
392             sys.exit(1)
393         collection_name = args.name
394     else:
395         collection_name = "Saved at {} by {}@{}".format(
396             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
397             pwd.getpwuid(os.getuid()).pw_name,
398             socket.gethostname())
399
400     if args.project_uuid and (args.stream or args.raw):
401         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
402         sys.exit(1)
403
404     # Determine the parent project
405     try:
406         project_uuid = desired_project_uuid(api_client, args.project_uuid,
407                                             args.retries)
408     except (apiclient_errors.Error, ValueError) as error:
409         print >>stderr, error
410         sys.exit(1)
411
412     # write_copies diverges from args.replication here.
413     # args.replication is how many copies we will instruct Arvados to
414     # maintain (by passing it in collections().create()) after all
415     # data is written -- and if None was given, we'll use None there.
416     # Meanwhile, write_copies is how many copies of each data block we
417     # write to Keep, which has to be a number.
418     #
419     # If we simply changed args.replication from None to a default
420     # here, we'd end up erroneously passing the default replication
421     # level (instead of None) to collections().create().
422     write_copies = (args.replication or
423                     api_client._rootDesc.get('defaultCollectionReplication', 2))
424
425     if args.progress:
426         reporter = progress_writer(human_progress)
427     elif args.batch_progress:
428         reporter = progress_writer(machine_progress)
429     else:
430         reporter = None
431     bytes_expected = expected_bytes_for(args.paths)
432
433     resume_cache = None
434     if args.resume:
435         try:
436             resume_cache = ResumeCache(ResumeCache.make_path(args))
437         except (IOError, OSError, ValueError):
438             pass  # Couldn't open cache directory/file.  Continue without it.
439         except ResumeCacheConflict:
440             print >>stderr, "\n".join([
441                 "arv-put: Another process is already uploading this data.",
442                 "         Use --no-resume if this is really what you want."])
443             sys.exit(1)
444
445     if resume_cache is None:
446         writer = ArvPutCollectionWriter(
447             resume_cache, reporter, bytes_expected,
448             num_retries=args.retries,
449             replication=write_copies)
450     else:
451         writer = ArvPutCollectionWriter.from_cache(
452             resume_cache, reporter, bytes_expected,
453             num_retries=args.retries,
454             replication=write_copies)
455
456     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
457     # the originals.
458     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
459                             for sigcode in CAUGHT_SIGNALS}
460
461     if writer.bytes_written > 0:  # We're resuming a previous upload.
462         print >>stderr, "\n".join([
463                 "arv-put: Resuming previous upload from last checkpoint.",
464                 "         Use the --no-resume option to start over."])
465
466     writer.report_progress()
467     writer.do_queued_work()  # Do work resumed from cache.
468     for path in args.paths:  # Copy file data to Keep.
469         if os.path.isdir(path):
470             writer.write_directory_tree(
471                 path, max_manifest_depth=args.max_manifest_depth)
472         else:
473             writer.start_new_stream()
474             writer.write_file(path, args.filename or os.path.basename(path))
475     writer.finish_current_stream()
476
477     if args.progress:  # Print newline to split stderr from stdout for humans.
478         print >>stderr
479
480     if args.stream:
481         output = writer.manifest_text()
482         if args.normalize:
483             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
484     elif args.raw:
485         output = ','.join(writer.data_locators())
486     else:
487         try:
488             manifest_text = writer.manifest_text()
489             if args.normalize:
490                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
491             replication_attr = 'replication_desired'
492             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
493                 # API called it 'redundancy' before #3410.
494                 replication_attr = 'redundancy'
495             # Register the resulting collection in Arvados.
496             collection = api_client.collections().create(
497                 body={
498                     'owner_uuid': project_uuid,
499                     'name': collection_name,
500                     'manifest_text': manifest_text,
501                     replication_attr: args.replication,
502                     },
503                 ensure_unique_name=True
504                 ).execute(num_retries=args.retries)
505
506             print >>stderr, "Collection saved as '%s'" % collection['name']
507
508             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
509                 output = collection['portable_data_hash']
510             else:
511                 output = collection['uuid']
512
513         except apiclient_errors.Error as error:
514             print >>stderr, (
515                 "arv-put: Error creating Collection on project: {}.".format(
516                     error))
517             status = 1
518
519     # Print the locator (uuid) of the new collection.
520     stdout.write(output)
521     if not output.endswith('\n'):
522         stdout.write('\n')
523
524     for sigcode, orig_handler in orig_signal_handlers.items():
525         signal.signal(sigcode, orig_handler)
526
527     if status != 0:
528         sys.exit(status)
529
530     if resume_cache is not None:
531         resume_cache.destroy()
532
533     return output
534
535 if __name__ == '__main__':
536     main()