Merge remote-tracking branch 'refs/remotes/origin/2939-create-params' into origin...
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import base64
9 import errno
10 import fcntl
11 import hashlib
12 import json
13 import os
14 import signal
15 import sys
16 import tempfile
17
18 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
19
20 def parse_arguments(arguments):
21     parser = argparse.ArgumentParser(
22         description='Copy data from the local filesystem to Keep.')
23
24     parser.add_argument('paths', metavar='path', type=str, nargs='*',
25                         help="""
26     Local file or directory. Default: read from standard input.
27     """)
28
29     parser.add_argument('--max-manifest-depth', type=int, metavar='N',
30                         default=-1, help="""
31     Maximum depth of directory tree to represent in the manifest
32     structure. A directory structure deeper than this will be represented
33     as a single stream in the manifest. If N=0, the manifest will contain
34     a single stream. Default: -1 (unlimited), i.e., exactly one manifest
35     stream per filesystem directory that contains files.
36     """)
37
38     group = parser.add_mutually_exclusive_group()
39
40     group.add_argument('--as-stream', action='store_true', dest='stream',
41                        help="""
42     Synonym for --stream.
43     """)
44
45     group.add_argument('--stream', action='store_true',
46                        help="""
47     Store the file content and display the resulting manifest on
48     stdout. Do not write the manifest to Keep or save a Collection object
49     in Arvados.
50     """)
51
52     group.add_argument('--as-manifest', action='store_true', dest='manifest',
53                        help="""
54     Synonym for --manifest.
55     """)
56
57     group.add_argument('--in-manifest', action='store_true', dest='manifest',
58                        help="""
59     Synonym for --manifest.
60     """)
61
62     group.add_argument('--manifest', action='store_true',
63                        help="""
64     Store the file data and resulting manifest in Keep, save a Collection
65     object in Arvados, and display the manifest locator (Collection uuid)
66     on stdout. This is the default behavior.
67     """)
68
69     group.add_argument('--as-raw', action='store_true', dest='raw',
70                        help="""
71     Synonym for --raw.
72     """)
73
74     group.add_argument('--raw', action='store_true',
75                        help="""
76     Store the file content and display the data block locators on stdout,
77     separated by commas, with a trailing newline. Do not store a
78     manifest.
79     """)
80
81     parser.add_argument('--use-filename', type=str, default=None,
82                         dest='filename', help="""
83     Synonym for --filename.
84     """)
85
86     parser.add_argument('--filename', type=str, default=None,
87                         help="""
88     Use the given filename in the manifest, instead of the name of the
89     local file. This is useful when "-" or "/dev/stdin" is given as an
90     input file. It can be used only if there is exactly one path given and
91     it is not a directory. Implies --manifest.
92     """)
93
94     group = parser.add_mutually_exclusive_group()
95     group.add_argument('--progress', action='store_true',
96                        help="""
97     Display human-readable progress on stderr (bytes and, if possible,
98     percentage of total data size). This is the default behavior when
99     stderr is a tty.
100     """)
101
102     group.add_argument('--no-progress', action='store_true',
103                        help="""
104     Do not display human-readable progress on stderr, even if stderr is a
105     tty.
106     """)
107
108     group.add_argument('--batch-progress', action='store_true',
109                        help="""
110     Display machine-readable progress on stderr (bytes and, if known,
111     total data size).
112     """)
113
114     group = parser.add_mutually_exclusive_group()
115     group.add_argument('--resume', action='store_true', default=True,
116                        help="""
117     Continue interrupted uploads from cached state (default).
118     """)
119     group.add_argument('--no-resume', action='store_false', dest='resume',
120                        help="""
121     Do not continue interrupted uploads from cached state.
122     """)
123
124     args = parser.parse_args(arguments)
125
126     if len(args.paths) == 0:
127         args.paths += ['/dev/stdin']
128
129     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
130         if args.filename:
131             parser.error("""
132     --filename argument cannot be used when storing a directory or
133     multiple files.
134     """)
135
136     # Turn on --progress by default if stderr is a tty.
137     if (not (args.batch_progress or args.no_progress)
138         and os.isatty(sys.stderr.fileno())):
139         args.progress = True
140
141     if args.paths == ['-']:
142         args.paths = ['/dev/stdin']
143         if not args.filename:
144             args.filename = '-'
145
146     return args
147
148 class ResumeCacheConflict(Exception):
149     pass
150
151
152 class ResumeCache(object):
153     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
154
155     @classmethod
156     def setup_user_cache(cls):
157         try:
158             os.makedirs(cls.CACHE_DIR)
159         except OSError as error:
160             if error.errno != errno.EEXIST:
161                 raise
162         else:
163             os.chmod(cls.CACHE_DIR, 0o700)
164
165     def __init__(self, file_spec):
166         self.cache_file = open(file_spec, 'a+')
167         self._lock_file(self.cache_file)
168         self.filename = self.cache_file.name
169
170     @classmethod
171     def make_path(cls, args):
172         md5 = hashlib.md5()
173         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
174         realpaths = sorted(os.path.realpath(path) for path in args.paths)
175         md5.update('\0'.join(realpaths))
176         if any(os.path.isdir(path) for path in realpaths):
177             md5.update(str(max(args.max_manifest_depth, -1)))
178         elif args.filename:
179             md5.update(args.filename)
180         return os.path.join(cls.CACHE_DIR, md5.hexdigest())
181
182     def _lock_file(self, fileobj):
183         try:
184             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
185         except IOError:
186             raise ResumeCacheConflict("{} locked".format(fileobj.name))
187
188     def load(self):
189         self.cache_file.seek(0)
190         return json.load(self.cache_file)
191
192     def save(self, data):
193         try:
194             new_cache_fd, new_cache_name = tempfile.mkstemp(
195                 dir=os.path.dirname(self.filename))
196             self._lock_file(new_cache_fd)
197             new_cache = os.fdopen(new_cache_fd, 'r+')
198             json.dump(data, new_cache)
199             os.rename(new_cache_name, self.filename)
200         except (IOError, OSError, ResumeCacheConflict) as error:
201             try:
202                 os.unlink(new_cache_name)
203             except NameError:  # mkstemp failed.
204                 pass
205         else:
206             self.cache_file.close()
207             self.cache_file = new_cache
208
209     def close(self):
210         self.cache_file.close()
211
212     def destroy(self):
213         try:
214             os.unlink(self.filename)
215         except OSError as error:
216             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
217                 raise
218         self.close()
219
220     def restart(self):
221         self.destroy()
222         self.__init__(self.filename)
223
224
225 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
226     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
227                    ['bytes_written', '_seen_inputs'])
228
229     def __init__(self, cache=None, reporter=None, bytes_expected=None):
230         self.bytes_written = 0
231         self._seen_inputs = []
232         self.cache = cache
233         self.reporter = reporter
234         self.bytes_expected = bytes_expected
235         super(ArvPutCollectionWriter, self).__init__()
236
237     @classmethod
238     def from_cache(cls, cache, reporter=None, bytes_expected=None):
239         try:
240             state = cache.load()
241             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
242             writer = cls.from_state(state, cache, reporter, bytes_expected)
243         except (TypeError, ValueError,
244                 arvados.errors.StaleWriterStateError) as error:
245             return cls(cache, reporter, bytes_expected)
246         else:
247             return writer
248
249     def cache_state(self):
250         if self.cache is None:
251             return
252         state = self.dump_state()
253         # Transform attributes for serialization.
254         for attr, value in state.items():
255             if attr == '_data_buffer':
256                 state[attr] = base64.encodestring(''.join(value))
257             elif hasattr(value, 'popleft'):
258                 state[attr] = list(value)
259         self.cache.save(state)
260
261     def report_progress(self):
262         if self.reporter is not None:
263             self.reporter(self.bytes_written, self.bytes_expected)
264
265     def flush_data(self):
266         start_buffer_len = self._data_buffer_len
267         start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
268         super(ArvPutCollectionWriter, self).flush_data()
269         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
270             self.bytes_written += (start_buffer_len - self._data_buffer_len)
271             self.report_progress()
272             if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
273                 self.cache_state()
274
275     def _record_new_input(self, input_type, source_name, dest_name):
276         # The key needs to be a list because that's what we'll get back
277         # from JSON deserialization.
278         key = [input_type, source_name, dest_name]
279         if key in self._seen_inputs:
280             return False
281         self._seen_inputs.append(key)
282         return True
283
284     def write_file(self, source, filename=None):
285         if self._record_new_input('file', source, filename):
286             super(ArvPutCollectionWriter, self).write_file(source, filename)
287
288     def write_directory_tree(self,
289                              path, stream_name='.', max_manifest_depth=-1):
290         if self._record_new_input('directory', path, stream_name):
291             super(ArvPutCollectionWriter, self).write_directory_tree(
292                 path, stream_name, max_manifest_depth)
293
294
295 def expected_bytes_for(pathlist):
296     # Walk the given directory trees and stat files, adding up file sizes,
297     # so we can display progress as percent
298     bytesum = 0
299     for path in pathlist:
300         if os.path.isdir(path):
301             for filename in arvados.util.listdir_recursive(path):
302                 bytesum += os.path.getsize(os.path.join(path, filename))
303         elif not os.path.isfile(path):
304             return None
305         else:
306             bytesum += os.path.getsize(path)
307     return bytesum
308
309 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
310                                                             os.getpid())
311 def machine_progress(bytes_written, bytes_expected):
312     return _machine_format.format(
313         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
314
315 def human_progress(bytes_written, bytes_expected):
316     if bytes_expected:
317         return "\r{}M / {}M {:.1%} ".format(
318             bytes_written >> 20, bytes_expected >> 20,
319             float(bytes_written) / bytes_expected)
320     else:
321         return "\r{} ".format(bytes_written)
322
323 def progress_writer(progress_func, outfile=sys.stderr):
324     def write_progress(bytes_written, bytes_expected):
325         outfile.write(progress_func(bytes_written, bytes_expected))
326     return write_progress
327
328 def exit_signal_handler(sigcode, frame):
329     sys.exit(-sigcode)
330
331 def main(arguments=None):
332     ResumeCache.setup_user_cache()
333     args = parse_arguments(arguments)
334
335     if args.progress:
336         reporter = progress_writer(human_progress)
337     elif args.batch_progress:
338         reporter = progress_writer(machine_progress)
339     else:
340         reporter = None
341
342     try:
343         resume_cache = ResumeCache(ResumeCache.make_path(args))
344         if not args.resume:
345             resume_cache.restart()
346     except ResumeCacheConflict:
347         print "arv-put: Another process is already uploading this data."
348         sys.exit(1)
349
350     writer = ArvPutCollectionWriter.from_cache(
351         resume_cache, reporter, expected_bytes_for(args.paths))
352
353     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
354     # the originals.
355     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
356                             for sigcode in CAUGHT_SIGNALS}
357
358     if writer.bytes_written > 0:  # We're resuming a previous upload.
359         print >>sys.stderr, "\n".join([
360                 "arv-put: Resuming previous upload from last checkpoint.",
361                 "         Use the --no-resume option to start over."])
362         writer.report_progress()
363
364     writer.do_queued_work()  # Do work resumed from cache.
365     for path in args.paths:  # Copy file data to Keep.
366         if os.path.isdir(path):
367             writer.write_directory_tree(
368                 path, max_manifest_depth=args.max_manifest_depth)
369         else:
370             writer.start_new_stream()
371             writer.write_file(path, args.filename or os.path.basename(path))
372     writer.finish_current_stream()
373
374     if args.progress:  # Print newline to split stderr from stdout for humans.
375         print >>sys.stderr
376
377     if args.stream:
378         print writer.manifest_text(),
379     elif args.raw:
380         print ','.join(writer.data_locators())
381     else:
382         # Register the resulting collection in Arvados.
383         collection = arvados.api().collections().create(
384             body={
385                 'uuid': writer.finish(),
386                 'manifest_text': writer.manifest_text(),
387                 },
388             ).execute()
389
390         # Print the locator (uuid) of the new collection.
391         print collection['uuid']
392
393     for sigcode, orig_handler in orig_signal_handlers.items():
394         signal.signal(sigcode, orig_handler)
395
396     resume_cache.destroy()
397
398 if __name__ == '__main__':
399     main()