8017: pass ram and vcpus runtime_constraints from Container to sbatch command.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import signal
18 import socket
19 import sys
20 import tempfile
21 from apiclient import errors as apiclient_errors
22
23 import arvados.commands._util as arv_cmd
24
25 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
26 api_client = None
27
28 upload_opts = argparse.ArgumentParser(add_help=False)
29
30 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
31                          help="""
32 Local file or directory. Default: read from standard input.
33 """)
34
35 _group = upload_opts.add_mutually_exclusive_group()
36
37 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
38                     default=-1, help="""
39 Maximum depth of directory tree to represent in the manifest
40 structure. A directory structure deeper than this will be represented
41 as a single stream in the manifest. If N=0, the manifest will contain
42 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
43 stream per filesystem directory that contains files.
44 """)
45
46 _group.add_argument('--normalize', action='store_true',
47                     help="""
48 Normalize the manifest by re-ordering files and streams after writing
49 data.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--as-stream', action='store_true', dest='stream',
55                     help="""
56 Synonym for --stream.
57 """)
58
59 _group.add_argument('--stream', action='store_true',
60                     help="""
61 Store the file content and display the resulting manifest on
62 stdout. Do not write the manifest to Keep or save a Collection object
63 in Arvados.
64 """)
65
66 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
67                     help="""
68 Synonym for --manifest.
69 """)
70
71 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
72                     help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--manifest', action='store_true',
77                     help="""
78 Store the file data and resulting manifest in Keep, save a Collection
79 object in Arvados, and display the manifest locator (Collection uuid)
80 on stdout. This is the default behavior.
81 """)
82
83 _group.add_argument('--as-raw', action='store_true', dest='raw',
84                     help="""
85 Synonym for --raw.
86 """)
87
88 _group.add_argument('--raw', action='store_true',
89                     help="""
90 Store the file content and display the data block locators on stdout,
91 separated by commas, with a trailing newline. Do not store a
92 manifest.
93 """)
94
95 upload_opts.add_argument('--use-filename', type=str, default=None,
96                          dest='filename', help="""
97 Synonym for --filename.
98 """)
99
100 upload_opts.add_argument('--filename', type=str, default=None,
101                          help="""
102 Use the given filename in the manifest, instead of the name of the
103 local file. This is useful when "-" or "/dev/stdin" is given as an
104 input file. It can be used only if there is exactly one path given and
105 it is not a directory. Implies --manifest.
106 """)
107
108 upload_opts.add_argument('--portable-data-hash', action='store_true',
109                          help="""
110 Print the portable data hash instead of the Arvados UUID for the collection
111 created by the upload.
112 """)
113
114 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
115                          help="""
116 Set the replication level for the new collection: how many different
117 physical storage devices (e.g., disks) should have a copy of each data
118 block. Default is to use the server-provided default (if any) or 2.
119 """)
120
121 run_opts = argparse.ArgumentParser(add_help=False)
122
123 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
124 Store the collection in the specified project, instead of your Home
125 project.
126 """)
127
128 run_opts.add_argument('--name', help="""
129 Save the collection with the specified name.
130 """)
131
132 _group = run_opts.add_mutually_exclusive_group()
133 _group.add_argument('--progress', action='store_true',
134                     help="""
135 Display human-readable progress on stderr (bytes and, if possible,
136 percentage of total data size). This is the default behavior when
137 stderr is a tty.
138 """)
139
140 _group.add_argument('--no-progress', action='store_true',
141                     help="""
142 Do not display human-readable progress on stderr, even if stderr is a
143 tty.
144 """)
145
146 _group.add_argument('--batch-progress', action='store_true',
147                     help="""
148 Display machine-readable progress on stderr (bytes and, if known,
149 total data size).
150 """)
151
152 _group = run_opts.add_mutually_exclusive_group()
153 _group.add_argument('--resume', action='store_true', default=True,
154                     help="""
155 Continue interrupted uploads from cached state (default).
156 """)
157 _group.add_argument('--no-resume', action='store_false', dest='resume',
158                     help="""
159 Do not continue interrupted uploads from cached state.
160 """)
161
162 arg_parser = argparse.ArgumentParser(
163     description='Copy data from the local filesystem to Keep.',
164     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
165
166 def parse_arguments(arguments):
167     args = arg_parser.parse_args(arguments)
168
169     if len(args.paths) == 0:
170         args.paths = ['-']
171
172     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
173
174     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
175         if args.filename:
176             arg_parser.error("""
177     --filename argument cannot be used when storing a directory or
178     multiple files.
179     """)
180
181     # Turn on --progress by default if stderr is a tty.
182     if (not (args.batch_progress or args.no_progress)
183         and os.isatty(sys.stderr.fileno())):
184         args.progress = True
185
186     if args.paths == ['-']:
187         args.resume = False
188         if not args.filename:
189             args.filename = 'stdin'
190
191     return args
192
193 class ResumeCacheConflict(Exception):
194     pass
195
196
197 class ResumeCache(object):
198     CACHE_DIR = '.cache/arvados/arv-put'
199
200     def __init__(self, file_spec):
201         self.cache_file = open(file_spec, 'a+')
202         self._lock_file(self.cache_file)
203         self.filename = self.cache_file.name
204
205     @classmethod
206     def make_path(cls, args):
207         md5 = hashlib.md5()
208         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
209         realpaths = sorted(os.path.realpath(path) for path in args.paths)
210         md5.update('\0'.join(realpaths))
211         if any(os.path.isdir(path) for path in realpaths):
212             md5.update(str(max(args.max_manifest_depth, -1)))
213         elif args.filename:
214             md5.update(args.filename)
215         return os.path.join(
216             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
217             md5.hexdigest())
218
219     def _lock_file(self, fileobj):
220         try:
221             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
222         except IOError:
223             raise ResumeCacheConflict("{} locked".format(fileobj.name))
224
225     def load(self):
226         self.cache_file.seek(0)
227         return json.load(self.cache_file)
228
229     def check_cache(self, api_client=None, num_retries=0):
230         try:
231             state = self.load()
232             locator = None
233             try:
234                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
235                     locator = state["_finished_streams"][0][1][0]
236                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
237                     locator = state["_current_stream_locators"][0]
238                 if locator is not None:
239                     kc = arvados.keep.KeepClient(api_client=api_client)
240                     kc.head(locator, num_retries=num_retries)
241             except Exception as e:
242                 self.restart()
243         except (ValueError):
244             pass
245
246     def save(self, data):
247         try:
248             new_cache_fd, new_cache_name = tempfile.mkstemp(
249                 dir=os.path.dirname(self.filename))
250             self._lock_file(new_cache_fd)
251             new_cache = os.fdopen(new_cache_fd, 'r+')
252             json.dump(data, new_cache)
253             os.rename(new_cache_name, self.filename)
254         except (IOError, OSError, ResumeCacheConflict) as error:
255             try:
256                 os.unlink(new_cache_name)
257             except NameError:  # mkstemp failed.
258                 pass
259         else:
260             self.cache_file.close()
261             self.cache_file = new_cache
262
263     def close(self):
264         self.cache_file.close()
265
266     def destroy(self):
267         try:
268             os.unlink(self.filename)
269         except OSError as error:
270             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
271                 raise
272         self.close()
273
274     def restart(self):
275         self.destroy()
276         self.__init__(self.filename)
277
278
279 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
280     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
281                    ['bytes_written', '_seen_inputs'])
282
283     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
284         self.bytes_written = 0
285         self._seen_inputs = []
286         self.cache = cache
287         self.reporter = reporter
288         self.bytes_expected = bytes_expected
289         super(ArvPutCollectionWriter, self).__init__(**kwargs)
290
291     @classmethod
292     def from_cache(cls, cache, reporter=None, bytes_expected=None,
293                    num_retries=0, replication=0):
294         try:
295             state = cache.load()
296             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
297             writer = cls.from_state(state, cache, reporter, bytes_expected,
298                                     num_retries=num_retries,
299                                     replication=replication)
300         except (TypeError, ValueError,
301                 arvados.errors.StaleWriterStateError) as error:
302             return cls(cache, reporter, bytes_expected,
303                        num_retries=num_retries,
304                        replication=replication)
305         else:
306             return writer
307
308     def cache_state(self):
309         if self.cache is None:
310             return
311         state = self.dump_state()
312         # Transform attributes for serialization.
313         for attr, value in state.items():
314             if attr == '_data_buffer':
315                 state[attr] = base64.encodestring(''.join(value))
316             elif hasattr(value, 'popleft'):
317                 state[attr] = list(value)
318         self.cache.save(state)
319
320     def report_progress(self):
321         if self.reporter is not None:
322             self.reporter(self.bytes_written, self.bytes_expected)
323
324     def flush_data(self):
325         start_buffer_len = self._data_buffer_len
326         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
327         super(ArvPutCollectionWriter, self).flush_data()
328         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
329             self.bytes_written += (start_buffer_len - self._data_buffer_len)
330             self.report_progress()
331             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
332                 self.cache_state()
333
334     def _record_new_input(self, input_type, source_name, dest_name):
335         # The key needs to be a list because that's what we'll get back
336         # from JSON deserialization.
337         key = [input_type, source_name, dest_name]
338         if key in self._seen_inputs:
339             return False
340         self._seen_inputs.append(key)
341         return True
342
343     def write_file(self, source, filename=None):
344         if self._record_new_input('file', source, filename):
345             super(ArvPutCollectionWriter, self).write_file(source, filename)
346
347     def write_directory_tree(self,
348                              path, stream_name='.', max_manifest_depth=-1):
349         if self._record_new_input('directory', path, stream_name):
350             super(ArvPutCollectionWriter, self).write_directory_tree(
351                 path, stream_name, max_manifest_depth)
352
353
354 def expected_bytes_for(pathlist):
355     # Walk the given directory trees and stat files, adding up file sizes,
356     # so we can display progress as percent
357     bytesum = 0
358     for path in pathlist:
359         if os.path.isdir(path):
360             for filename in arvados.util.listdir_recursive(path):
361                 bytesum += os.path.getsize(os.path.join(path, filename))
362         elif not os.path.isfile(path):
363             return None
364         else:
365             bytesum += os.path.getsize(path)
366     return bytesum
367
368 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
369                                                             os.getpid())
370 def machine_progress(bytes_written, bytes_expected):
371     return _machine_format.format(
372         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
373
374 def human_progress(bytes_written, bytes_expected):
375     if bytes_expected:
376         return "\r{}M / {}M {:.1%} ".format(
377             bytes_written >> 20, bytes_expected >> 20,
378             float(bytes_written) / bytes_expected)
379     else:
380         return "\r{} ".format(bytes_written)
381
382 def progress_writer(progress_func, outfile=sys.stderr):
383     def write_progress(bytes_written, bytes_expected):
384         outfile.write(progress_func(bytes_written, bytes_expected))
385     return write_progress
386
387 def exit_signal_handler(sigcode, frame):
388     sys.exit(-sigcode)
389
390 def desired_project_uuid(api_client, project_uuid, num_retries):
391     if not project_uuid:
392         query = api_client.users().current()
393     elif arvados.util.user_uuid_pattern.match(project_uuid):
394         query = api_client.users().get(uuid=project_uuid)
395     elif arvados.util.group_uuid_pattern.match(project_uuid):
396         query = api_client.groups().get(uuid=project_uuid)
397     else:
398         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
399     return query.execute(num_retries=num_retries)['uuid']
400
401 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
402     global api_client
403
404     args = parse_arguments(arguments)
405     status = 0
406     if api_client is None:
407         api_client = arvados.api('v1')
408
409     # Determine the name to use
410     if args.name:
411         if args.stream or args.raw:
412             print >>stderr, "Cannot use --name with --stream or --raw"
413             sys.exit(1)
414         collection_name = args.name
415     else:
416         collection_name = "Saved at {} by {}@{}".format(
417             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
418             pwd.getpwuid(os.getuid()).pw_name,
419             socket.gethostname())
420
421     if args.project_uuid and (args.stream or args.raw):
422         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
423         sys.exit(1)
424
425     # Determine the parent project
426     try:
427         project_uuid = desired_project_uuid(api_client, args.project_uuid,
428                                             args.retries)
429     except (apiclient_errors.Error, ValueError) as error:
430         print >>stderr, error
431         sys.exit(1)
432
433     # write_copies diverges from args.replication here.
434     # args.replication is how many copies we will instruct Arvados to
435     # maintain (by passing it in collections().create()) after all
436     # data is written -- and if None was given, we'll use None there.
437     # Meanwhile, write_copies is how many copies of each data block we
438     # write to Keep, which has to be a number.
439     #
440     # If we simply changed args.replication from None to a default
441     # here, we'd end up erroneously passing the default replication
442     # level (instead of None) to collections().create().
443     write_copies = (args.replication or
444                     api_client._rootDesc.get('defaultCollectionReplication', 2))
445
446     if args.progress:
447         reporter = progress_writer(human_progress)
448     elif args.batch_progress:
449         reporter = progress_writer(machine_progress)
450     else:
451         reporter = None
452     bytes_expected = expected_bytes_for(args.paths)
453
454     resume_cache = None
455     if args.resume:
456         try:
457             resume_cache = ResumeCache(ResumeCache.make_path(args))
458             resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
459         except (IOError, OSError, ValueError):
460             pass  # Couldn't open cache directory/file.  Continue without it.
461         except ResumeCacheConflict:
462             print >>stderr, "\n".join([
463                 "arv-put: Another process is already uploading this data.",
464                 "         Use --no-resume if this is really what you want."])
465             sys.exit(1)
466
467     if resume_cache is None:
468         writer = ArvPutCollectionWriter(
469             resume_cache, reporter, bytes_expected,
470             num_retries=args.retries,
471             replication=write_copies)
472     else:
473         writer = ArvPutCollectionWriter.from_cache(
474             resume_cache, reporter, bytes_expected,
475             num_retries=args.retries,
476             replication=write_copies)
477
478     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
479     # the originals.
480     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
481                             for sigcode in CAUGHT_SIGNALS}
482
483     if writer.bytes_written > 0:  # We're resuming a previous upload.
484         print >>stderr, "\n".join([
485                 "arv-put: Resuming previous upload from last checkpoint.",
486                 "         Use the --no-resume option to start over."])
487
488     writer.report_progress()
489     writer.do_queued_work()  # Do work resumed from cache.
490     for path in args.paths:  # Copy file data to Keep.
491         if path == '-':
492             writer.start_new_stream()
493             writer.start_new_file(args.filename)
494             r = sys.stdin.read(64*1024)
495             while r:
496                 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
497                 # CollectionWriter.write().
498                 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
499                 r = sys.stdin.read(64*1024)
500         elif os.path.isdir(path):
501             writer.write_directory_tree(
502                 path, max_manifest_depth=args.max_manifest_depth)
503         else:
504             writer.start_new_stream()
505             writer.write_file(path, args.filename or os.path.basename(path))
506     writer.finish_current_stream()
507
508     if args.progress:  # Print newline to split stderr from stdout for humans.
509         print >>stderr
510
511     if args.stream:
512         output = writer.manifest_text()
513         if args.normalize:
514             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
515     elif args.raw:
516         output = ','.join(writer.data_locators())
517     else:
518         try:
519             manifest_text = writer.manifest_text()
520             if args.normalize:
521                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
522             replication_attr = 'replication_desired'
523             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
524                 # API called it 'redundancy' before #3410.
525                 replication_attr = 'redundancy'
526             # Register the resulting collection in Arvados.
527             collection = api_client.collections().create(
528                 body={
529                     'owner_uuid': project_uuid,
530                     'name': collection_name,
531                     'manifest_text': manifest_text,
532                     replication_attr: args.replication,
533                     },
534                 ensure_unique_name=True
535                 ).execute(num_retries=args.retries)
536
537             print >>stderr, "Collection saved as '%s'" % collection['name']
538
539             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
540                 output = collection['portable_data_hash']
541             else:
542                 output = collection['uuid']
543
544         except apiclient_errors.Error as error:
545             print >>stderr, (
546                 "arv-put: Error creating Collection on project: {}.".format(
547                     error))
548             status = 1
549
550     # Print the locator (uuid) of the new collection.
551     stdout.write(output)
552     if not output.endswith('\n'):
553         stdout.write('\n')
554
555     for sigcode, orig_handler in orig_signal_handlers.items():
556         signal.signal(sigcode, orig_handler)
557
558     if status != 0:
559         sys.exit(status)
560
561     if resume_cache is not None:
562         resume_cache.destroy()
563
564     return output
565
566 if __name__ == '__main__':
567     main()