9463: Initial coding with tests
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 from apiclient import errors as apiclient_errors
23
24 import arvados.commands._util as arv_cmd
25
26 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
27 api_client = None
28
29 upload_opts = argparse.ArgumentParser(add_help=False)
30
31 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
32                          help="""
33 Local file or directory. Default: read from standard input.
34 """)
35
36 _group = upload_opts.add_mutually_exclusive_group()
37
38 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
39                     default=-1, help="""
40 Maximum depth of directory tree to represent in the manifest
41 structure. A directory structure deeper than this will be represented
42 as a single stream in the manifest. If N=0, the manifest will contain
43 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
44 stream per filesystem directory that contains files.
45 """)
46
47 _group.add_argument('--normalize', action='store_true',
48                     help="""
49 Normalize the manifest by re-ordering files and streams after writing
50 data.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--as-stream', action='store_true', dest='stream',
56                     help="""
57 Synonym for --stream.
58 """)
59
60 _group.add_argument('--stream', action='store_true',
61                     help="""
62 Store the file content and display the resulting manifest on
63 stdout. Do not write the manifest to Keep or save a Collection object
64 in Arvados.
65 """)
66
67 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68                     help="""
69 Synonym for --manifest.
70 """)
71
72 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73                     help="""
74 Synonym for --manifest.
75 """)
76
77 _group.add_argument('--manifest', action='store_true',
78                     help="""
79 Store the file data and resulting manifest in Keep, save a Collection
80 object in Arvados, and display the manifest locator (Collection uuid)
81 on stdout. This is the default behavior.
82 """)
83
84 _group.add_argument('--as-raw', action='store_true', dest='raw',
85                     help="""
86 Synonym for --raw.
87 """)
88
89 _group.add_argument('--raw', action='store_true',
90                     help="""
91 Store the file content and display the data block locators on stdout,
92 separated by commas, with a trailing newline. Do not store a
93 manifest.
94 """)
95
96 upload_opts.add_argument('--use-filename', type=str, default=None,
97                          dest='filename', help="""
98 Synonym for --filename.
99 """)
100
101 upload_opts.add_argument('--filename', type=str, default=None,
102                          help="""
103 Use the given filename in the manifest, instead of the name of the
104 local file. This is useful when "-" or "/dev/stdin" is given as an
105 input file. It can be used only if there is exactly one path given and
106 it is not a directory. Implies --manifest.
107 """)
108
109 upload_opts.add_argument('--portable-data-hash', action='store_true',
110                          help="""
111 Print the portable data hash instead of the Arvados UUID for the collection
112 created by the upload.
113 """)
114
115 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
116                          help="""
117 Set the replication level for the new collection: how many different
118 physical storage devices (e.g., disks) should have a copy of each data
119 block. Default is to use the server-provided default (if any) or 2.
120 """)
121
122 run_opts = argparse.ArgumentParser(add_help=False)
123
124 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
125 Store the collection in the specified project, instead of your Home
126 project.
127 """)
128
129 run_opts.add_argument('--name', help="""
130 Save the collection with the specified name.
131 """)
132
133 _group = run_opts.add_mutually_exclusive_group()
134 _group.add_argument('--progress', action='store_true',
135                     help="""
136 Display human-readable progress on stderr (bytes and, if possible,
137 percentage of total data size). This is the default behavior when
138 stderr is a tty.
139 """)
140
141 _group.add_argument('--no-progress', action='store_true',
142                     help="""
143 Do not display human-readable progress on stderr, even if stderr is a
144 tty.
145 """)
146
147 _group.add_argument('--batch-progress', action='store_true',
148                     help="""
149 Display machine-readable progress on stderr (bytes and, if known,
150 total data size).
151 """)
152
153 _group = run_opts.add_mutually_exclusive_group()
154 _group.add_argument('--resume', action='store_true', default=True,
155                     help="""
156 Continue interrupted uploads from cached state (default).
157 """)
158 _group.add_argument('--no-resume', action='store_false', dest='resume',
159                     help="""
160 Do not continue interrupted uploads from cached state.
161 """)
162
163 arg_parser = argparse.ArgumentParser(
164     description='Copy data from the local filesystem to Keep.',
165     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
166
167 def parse_arguments(arguments):
168     args = arg_parser.parse_args(arguments)
169
170     if len(args.paths) == 0:
171         args.paths = ['-']
172
173     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
174
175     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
176         if args.filename:
177             arg_parser.error("""
178     --filename argument cannot be used when storing a directory or
179     multiple files.
180     """)
181
182     # Turn on --progress by default if stderr is a tty.
183     if (not (args.batch_progress or args.no_progress)
184         and os.isatty(sys.stderr.fileno())):
185         args.progress = True
186
187     if args.paths == ['-']:
188         args.resume = False
189         if not args.filename:
190             args.filename = 'stdin'
191
192     return args
193
194 class ResumeCacheConflict(Exception):
195     pass
196
197
198 class ResumeCache(object):
199     CACHE_DIR = '.cache/arvados/arv-put'
200
201     def __init__(self, file_spec):
202         self.cache_file = open(file_spec, 'a+')
203         self._lock_file(self.cache_file)
204         self.filename = self.cache_file.name
205
206     @classmethod
207     def make_path(cls, args):
208         md5 = hashlib.md5()
209         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
210         realpaths = sorted(os.path.realpath(path) for path in args.paths)
211         md5.update('\0'.join(realpaths))
212         if any(os.path.isdir(path) for path in realpaths):
213             md5.update(str(max(args.max_manifest_depth, -1)))
214         elif args.filename:
215             md5.update(args.filename)
216         return os.path.join(
217             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
218             md5.hexdigest())
219
220     def _lock_file(self, fileobj):
221         try:
222             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
223         except IOError:
224             raise ResumeCacheConflict("{} locked".format(fileobj.name))
225
226     def load(self):
227         self.cache_file.seek(0)
228         return json.load(self.cache_file)
229
230     def check_cache(self, api_client=None, num_retries=0):
231         try:
232             state = self.load()
233             locator = None
234             try:
235                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
236                     locator = state["_finished_streams"][0][1][0]
237                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
238                     locator = state["_current_stream_locators"][0]
239                 if locator is not None:
240                     kc = arvados.keep.KeepClient(api_client=api_client)
241                     kc.head(locator, num_retries=num_retries)
242             except Exception as e:
243                 self.restart()
244         except (ValueError):
245             pass
246
247     def save(self, data):
248         try:
249             new_cache_fd, new_cache_name = tempfile.mkstemp(
250                 dir=os.path.dirname(self.filename))
251             self._lock_file(new_cache_fd)
252             new_cache = os.fdopen(new_cache_fd, 'r+')
253             json.dump(data, new_cache)
254             os.rename(new_cache_name, self.filename)
255         except (IOError, OSError, ResumeCacheConflict) as error:
256             try:
257                 os.unlink(new_cache_name)
258             except NameError:  # mkstemp failed.
259                 pass
260         else:
261             self.cache_file.close()
262             self.cache_file = new_cache
263
264     def close(self):
265         self.cache_file.close()
266
267     def destroy(self):
268         try:
269             os.unlink(self.filename)
270         except OSError as error:
271             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
272                 raise
273         self.close()
274
275     def restart(self):
276         self.destroy()
277         self.__init__(self.filename)
278
279
280 class ArvPutCollection(object):
281     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
282         self.collection_flush_time = 60
283         self.bytes_written = 0
284         self._seen_inputs = []
285         self.cache = cache
286         self.reporter = reporter
287         self.bytes_expected = bytes_expected
288         # super(ArvPutCollection, self).__init__(**kwargs)
289         self.collection = arvados.collection.Collection()
290         self.collection.save_new()
291
292     def write_file(self, source, filename):
293         with self.collection as c:
294             with open(source, 'r') as source_fd:
295                 output = c.open(filename, 'w')
296                 start_time = time.time()
297                 while True:
298                     data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
299                     if not data:
300                         break
301                     output.write(data)
302                     output.flush()
303                     # Is it time to update the collection?
304                     if (time.time() - start_time) > self.collection_flush_time:
305                         self.collection.save()
306                         start_time = time.time()
307                 # File write finished
308                 output.close()
309                 self.collection.save() # TODO: Is this necessary?
310
311     def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
312         if os.path.isdir(path):
313             for item in os.listdir(path):
314                 if os.path.isdir(item):
315                     self.write_directory_tree(os.path.join(path, item), 
316                                     os.path.join(stream_name, path, item))
317                 else:
318                     self.write_file(os.path.join(path, item), 
319                                     os.path.join(stream_name, item))
320
321     def manifest(self):
322         print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
323         print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
324         return True
325     
326     def report_progress(self):
327         if self.reporter is not None:
328             self.reporter(self.bytes_written, self.bytes_expected)
329
330
331 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
332     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
333                    ['bytes_written', '_seen_inputs'])
334
335     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
336         self.bytes_written = 0
337         self._seen_inputs = []
338         self.cache = cache
339         self.reporter = reporter
340         self.bytes_expected = bytes_expected
341         super(ArvPutCollectionWriter, self).__init__(**kwargs)
342
343     @classmethod
344     def from_cache(cls, cache, reporter=None, bytes_expected=None,
345                    num_retries=0, replication=0):
346         try:
347             state = cache.load()
348             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
349             writer = cls.from_state(state, cache, reporter, bytes_expected,
350                                     num_retries=num_retries,
351                                     replication=replication)
352         except (TypeError, ValueError,
353                 arvados.errors.StaleWriterStateError) as error:
354             return cls(cache, reporter, bytes_expected,
355                        num_retries=num_retries,
356                        replication=replication)
357         else:
358             return writer
359
360     def cache_state(self):
361         if self.cache is None:
362             return
363         state = self.dump_state()
364         # Transform attributes for serialization.
365         for attr, value in state.items():
366             if attr == '_data_buffer':
367                 state[attr] = base64.encodestring(''.join(value))
368             elif hasattr(value, 'popleft'):
369                 state[attr] = list(value)
370         self.cache.save(state)
371
372     def report_progress(self):
373         if self.reporter is not None:
374             self.reporter(self.bytes_written, self.bytes_expected)
375
376     def flush_data(self):
377         start_buffer_len = self._data_buffer_len
378         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
379         super(ArvPutCollectionWriter, self).flush_data()
380         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
381             self.bytes_written += (start_buffer_len - self._data_buffer_len)
382             self.report_progress()
383             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
384                 self.cache_state()
385
386     def _record_new_input(self, input_type, source_name, dest_name):
387         # The key needs to be a list because that's what we'll get back
388         # from JSON deserialization.
389         key = [input_type, source_name, dest_name]
390         if key in self._seen_inputs:
391             return False
392         self._seen_inputs.append(key)
393         return True
394
395     def write_file(self, source, filename=None):
396         if self._record_new_input('file', source, filename):
397             super(ArvPutCollectionWriter, self).write_file(source, filename)
398
399     def write_directory_tree(self,
400                              path, stream_name='.', max_manifest_depth=-1):
401         if self._record_new_input('directory', path, stream_name):
402             super(ArvPutCollectionWriter, self).write_directory_tree(
403                 path, stream_name, max_manifest_depth)
404
405
406 def expected_bytes_for(pathlist):
407     # Walk the given directory trees and stat files, adding up file sizes,
408     # so we can display progress as percent
409     bytesum = 0
410     for path in pathlist:
411         if os.path.isdir(path):
412             for filename in arvados.util.listdir_recursive(path):
413                 bytesum += os.path.getsize(os.path.join(path, filename))
414         elif not os.path.isfile(path):
415             return None
416         else:
417             bytesum += os.path.getsize(path)
418     return bytesum
419
420 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
421                                                             os.getpid())
422 def machine_progress(bytes_written, bytes_expected):
423     return _machine_format.format(
424         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
425
426 def human_progress(bytes_written, bytes_expected):
427     if bytes_expected:
428         return "\r{}M / {}M {:.1%} ".format(
429             bytes_written >> 20, bytes_expected >> 20,
430             float(bytes_written) / bytes_expected)
431     else:
432         return "\r{} ".format(bytes_written)
433
434 def progress_writer(progress_func, outfile=sys.stderr):
435     def write_progress(bytes_written, bytes_expected):
436         outfile.write(progress_func(bytes_written, bytes_expected))
437     return write_progress
438
439 def exit_signal_handler(sigcode, frame):
440     sys.exit(-sigcode)
441
442 def desired_project_uuid(api_client, project_uuid, num_retries):
443     if not project_uuid:
444         query = api_client.users().current()
445     elif arvados.util.user_uuid_pattern.match(project_uuid):
446         query = api_client.users().get(uuid=project_uuid)
447     elif arvados.util.group_uuid_pattern.match(project_uuid):
448         query = api_client.groups().get(uuid=project_uuid)
449     else:
450         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
451     return query.execute(num_retries=num_retries)['uuid']
452
453 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
454     global api_client
455
456     args = parse_arguments(arguments)
457     status = 0
458     if api_client is None:
459         api_client = arvados.api('v1')
460
461     # Determine the name to use
462     if args.name:
463         if args.stream or args.raw:
464             print >>stderr, "Cannot use --name with --stream or --raw"
465             sys.exit(1)
466         collection_name = args.name
467     else:
468         collection_name = "Saved at {} by {}@{}".format(
469             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
470             pwd.getpwuid(os.getuid()).pw_name,
471             socket.gethostname())
472
473     if args.project_uuid and (args.stream or args.raw):
474         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
475         sys.exit(1)
476
477     # Determine the parent project
478     try:
479         project_uuid = desired_project_uuid(api_client, args.project_uuid,
480                                             args.retries)
481     except (apiclient_errors.Error, ValueError) as error:
482         print >>stderr, error
483         sys.exit(1)
484
485     # write_copies diverges from args.replication here.
486     # args.replication is how many copies we will instruct Arvados to
487     # maintain (by passing it in collections().create()) after all
488     # data is written -- and if None was given, we'll use None there.
489     # Meanwhile, write_copies is how many copies of each data block we
490     # write to Keep, which has to be a number.
491     #
492     # If we simply changed args.replication from None to a default
493     # here, we'd end up erroneously passing the default replication
494     # level (instead of None) to collections().create().
495     write_copies = (args.replication or
496                     api_client._rootDesc.get('defaultCollectionReplication', 2))
497
498     if args.progress:
499         reporter = progress_writer(human_progress)
500     elif args.batch_progress:
501         reporter = progress_writer(machine_progress)
502     else:
503         reporter = None
504     bytes_expected = expected_bytes_for(args.paths)
505
506     resume_cache = None
507     if args.resume:
508         try:
509             resume_cache = ResumeCache(ResumeCache.make_path(args))
510             resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
511         except (IOError, OSError, ValueError):
512             pass  # Couldn't open cache directory/file.  Continue without it.
513         except ResumeCacheConflict:
514             print >>stderr, "\n".join([
515                 "arv-put: Another process is already uploading this data.",
516                 "         Use --no-resume if this is really what you want."])
517             sys.exit(1)
518
519     if resume_cache is None:
520         writer = ArvPutCollectionWriter(
521             resume_cache, reporter, bytes_expected,
522             num_retries=args.retries,
523             replication=write_copies)
524     else:
525         writer = ArvPutCollectionWriter.from_cache(
526             resume_cache, reporter, bytes_expected,
527             num_retries=args.retries,
528             replication=write_copies)
529
530     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
531     # the originals.
532     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
533                             for sigcode in CAUGHT_SIGNALS}
534
535     if writer.bytes_written > 0:  # We're resuming a previous upload.
536         print >>stderr, "\n".join([
537                 "arv-put: Resuming previous upload from last checkpoint.",
538                 "         Use the --no-resume option to start over."])
539
540     writer.report_progress()
541     writer.do_queued_work()  # Do work resumed from cache.
542     for path in args.paths:  # Copy file data to Keep.
543         if path == '-':
544             writer.start_new_stream()
545             writer.start_new_file(args.filename)
546             r = sys.stdin.read(64*1024)
547             while r:
548                 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
549                 # CollectionWriter.write().
550                 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
551                 r = sys.stdin.read(64*1024)
552         elif os.path.isdir(path):
553             writer.write_directory_tree(
554                 path, max_manifest_depth=args.max_manifest_depth)
555         else:
556             writer.start_new_stream()
557             writer.write_file(path, args.filename or os.path.basename(path))
558     writer.finish_current_stream()
559
560     if args.progress:  # Print newline to split stderr from stdout for humans.
561         print >>stderr
562
563     output = None
564     if args.stream:
565         output = writer.manifest_text()
566         if args.normalize:
567             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
568     elif args.raw:
569         output = ','.join(writer.data_locators())
570     else:
571         try:
572             manifest_text = writer.manifest_text()
573             if args.normalize:
574                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
575             replication_attr = 'replication_desired'
576             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
577                 # API called it 'redundancy' before #3410.
578                 replication_attr = 'redundancy'
579             # Register the resulting collection in Arvados.
580             collection = api_client.collections().create(
581                 body={
582                     'owner_uuid': project_uuid,
583                     'name': collection_name,
584                     'manifest_text': manifest_text,
585                     replication_attr: args.replication,
586                     },
587                 ensure_unique_name=True
588                 ).execute(num_retries=args.retries)
589
590             print >>stderr, "Collection saved as '%s'" % collection['name']
591
592             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
593                 output = collection['portable_data_hash']
594             else:
595                 output = collection['uuid']
596
597         except apiclient_errors.Error as error:
598             print >>stderr, (
599                 "arv-put: Error creating Collection on project: {}.".format(
600                     error))
601             status = 1
602
603     # Print the locator (uuid) of the new collection.
604     if output is None:
605         status = status or 1
606     else:
607         stdout.write(output)
608         if not output.endswith('\n'):
609             stdout.write('\n')
610
611     for sigcode, orig_handler in orig_signal_handlers.items():
612         signal.signal(sigcode, orig_handler)
613
614     if status != 0:
615         sys.exit(status)
616
617     if resume_cache is not None:
618         resume_cache.destroy()
619
620     return output
621
622 if __name__ == '__main__':
623     main()