2752: arv-put displays a notice when it resumes.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import base64
9 import errno
10 import fcntl
11 import hashlib
12 import json
13 import os
14 import sys
15 import tempfile
16
17 def parse_arguments(arguments):
18     parser = argparse.ArgumentParser(
19         description='Copy data from the local filesystem to Keep.')
20
21     parser.add_argument('paths', metavar='path', type=str, nargs='*',
22                         help="""
23     Local file or directory. Default: read from standard input.
24     """)
25
26     parser.add_argument('--max-manifest-depth', type=int, metavar='N',
27                         default=-1, help="""
28     Maximum depth of directory tree to represent in the manifest
29     structure. A directory structure deeper than this will be represented
30     as a single stream in the manifest. If N=0, the manifest will contain
31     a single stream. Default: -1 (unlimited), i.e., exactly one manifest
32     stream per filesystem directory that contains files.
33     """)
34
35     group = parser.add_mutually_exclusive_group()
36
37     group.add_argument('--as-stream', action='store_true', dest='stream',
38                        help="""
39     Synonym for --stream.
40     """)
41
42     group.add_argument('--stream', action='store_true',
43                        help="""
44     Store the file content and display the resulting manifest on
45     stdout. Do not write the manifest to Keep or save a Collection object
46     in Arvados.
47     """)
48
49     group.add_argument('--as-manifest', action='store_true', dest='manifest',
50                        help="""
51     Synonym for --manifest.
52     """)
53
54     group.add_argument('--in-manifest', action='store_true', dest='manifest',
55                        help="""
56     Synonym for --manifest.
57     """)
58
59     group.add_argument('--manifest', action='store_true',
60                        help="""
61     Store the file data and resulting manifest in Keep, save a Collection
62     object in Arvados, and display the manifest locator (Collection uuid)
63     on stdout. This is the default behavior.
64     """)
65
66     group.add_argument('--as-raw', action='store_true', dest='raw',
67                        help="""
68     Synonym for --raw.
69     """)
70
71     group.add_argument('--raw', action='store_true',
72                        help="""
73     Store the file content and display the data block locators on stdout,
74     separated by commas, with a trailing newline. Do not store a
75     manifest.
76     """)
77
78     parser.add_argument('--use-filename', type=str, default=None,
79                         dest='filename', help="""
80     Synonym for --filename.
81     """)
82
83     parser.add_argument('--filename', type=str, default=None,
84                         help="""
85     Use the given filename in the manifest, instead of the name of the
86     local file. This is useful when "-" or "/dev/stdin" is given as an
87     input file. It can be used only if there is exactly one path given and
88     it is not a directory. Implies --manifest.
89     """)
90
91     group = parser.add_mutually_exclusive_group()
92     group.add_argument('--progress', action='store_true',
93                        help="""
94     Display human-readable progress on stderr (bytes and, if possible,
95     percentage of total data size). This is the default behavior when
96     stderr is a tty.
97     """)
98
99     group.add_argument('--no-progress', action='store_true',
100                        help="""
101     Do not display human-readable progress on stderr, even if stderr is a
102     tty.
103     """)
104
105     group.add_argument('--batch-progress', action='store_true',
106                        help="""
107     Display machine-readable progress on stderr (bytes and, if known,
108     total data size).
109     """)
110
111     group = parser.add_mutually_exclusive_group()
112     group.add_argument('--resume', action='store_true', default=True,
113                        help="""
114     Continue interrupted uploads from cached state (default).
115     """)
116     group.add_argument('--no-resume', action='store_false', dest='resume',
117                        help="""
118     Do not continue interrupted uploads from cached state.
119     """)
120
121     args = parser.parse_args(arguments)
122
123     if len(args.paths) == 0:
124         args.paths += ['/dev/stdin']
125
126     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
127         if args.filename:
128             parser.error("""
129     --filename argument cannot be used when storing a directory or
130     multiple files.
131     """)
132
133     # Turn on --progress by default if stderr is a tty.
134     if (not (args.batch_progress or args.no_progress)
135         and os.isatty(sys.stderr.fileno())):
136         args.progress = True
137
138     if args.paths == ['-']:
139         args.paths = ['/dev/stdin']
140         if not args.filename:
141             args.filename = '-'
142
143     return args
144
145 class ResumeCacheConflict(Exception):
146     pass
147
148
149 class ResumeCache(object):
150     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
151
152     @classmethod
153     def setup_user_cache(cls):
154         try:
155             os.makedirs(cls.CACHE_DIR)
156         except OSError as error:
157             if error.errno != errno.EEXIST:
158                 raise
159         else:
160             os.chmod(cls.CACHE_DIR, 0o700)
161
162     def __init__(self, file_spec):
163         self.cache_file = open(file_spec, 'a+')
164         self._lock_file(self.cache_file)
165         self.filename = self.cache_file.name
166
167     @classmethod
168     def make_path(cls, args):
169         md5 = hashlib.md5()
170         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
171         realpaths = sorted(os.path.realpath(path) for path in args.paths)
172         md5.update(''.join(realpaths))
173         if any(os.path.isdir(path) for path in realpaths):
174             md5.update(str(max(args.max_manifest_depth, -1)))
175         elif args.filename:
176             md5.update(args.filename)
177         return os.path.join(cls.CACHE_DIR, md5.hexdigest())
178
179     def _lock_file(self, fileobj):
180         try:
181             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
182         except IOError:
183             raise ResumeCacheConflict("{} locked".format(fileobj.name))
184
185     def load(self):
186         self.cache_file.seek(0)
187         return json.load(self.cache_file)
188
189     def save(self, data):
190         try:
191             new_cache_fd, new_cache_name = tempfile.mkstemp(
192                 dir=os.path.dirname(self.filename))
193             self._lock_file(new_cache_fd)
194             new_cache = os.fdopen(new_cache_fd, 'r+')
195             json.dump(data, new_cache)
196             os.rename(new_cache_name, self.filename)
197         except (IOError, OSError, ResumeCacheConflict) as error:
198             try:
199                 os.unlink(new_cache_name)
200             except NameError:  # mkstemp failed.
201                 pass
202         else:
203             self.cache_file.close()
204             self.cache_file = new_cache
205
206     def close(self):
207         self.cache_file.close()
208
209     def destroy(self):
210         try:
211             os.unlink(self.filename)
212         except OSError as error:
213             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
214                 raise
215         self.close()
216
217     def restart(self):
218         self.destroy()
219         self.__init__(self.filename)
220
221
222 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
223     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
224                    ['bytes_written'])
225
226     def __init__(self, cache=None, reporter=None, bytes_expected=None):
227         self.bytes_written = 0
228         self.cache = cache
229         self.report_func = reporter
230         self.bytes_expected = bytes_expected
231         super(ArvPutCollectionWriter, self).__init__()
232
233     @classmethod
234     def from_cache(cls, cache, reporter=None, bytes_expected=None):
235         try:
236             state = cache.load()
237             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
238             writer = cls.from_state(state, cache, reporter, bytes_expected)
239         except (TypeError, ValueError,
240                 arvados.errors.StaleWriterStateError) as error:
241             return cls(cache, reporter, bytes_expected)
242         else:
243             return writer
244
245     def preresume_hook(self):
246         print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
247
248     def checkpoint_state(self):
249         if self.cache is None:
250             return
251         state = self.dump_state()
252         # Transform attributes for serialization.
253         for attr, value in state.items():
254             if attr == '_data_buffer':
255                 state[attr] = base64.encodestring(''.join(value))
256             elif hasattr(value, 'popleft'):
257                 state[attr] = list(value)
258         self.cache.save(state)
259
260     def flush_data(self):
261         bytes_buffered = self._data_buffer_len
262         super(ArvPutCollectionWriter, self).flush_data()
263         self.bytes_written += (bytes_buffered - self._data_buffer_len)
264         if self.report_func is not None:
265             self.report_func(self.bytes_written, self.bytes_expected)
266
267
268 def expected_bytes_for(pathlist):
269     # Walk the given directory trees and stat files, adding up file sizes,
270     # so we can display progress as percent
271     bytesum = 0
272     for path in pathlist:
273         if os.path.isdir(path):
274             for filename in arvados.util.listdir_recursive(path):
275                 bytesum += os.path.getsize(os.path.join(path, filename))
276         elif not os.path.isfile(path):
277             return None
278         else:
279             bytesum += os.path.getsize(path)
280     return bytesum
281
282 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
283                                                             os.getpid())
284 def machine_progress(bytes_written, bytes_expected):
285     return _machine_format.format(
286         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
287
288 def human_progress(bytes_written, bytes_expected):
289     if bytes_expected:
290         return "\r{}M / {}M {:.1%} ".format(
291             bytes_written >> 20, bytes_expected >> 20,
292             float(bytes_written) / bytes_expected)
293     else:
294         return "\r{} ".format(bytes_written)
295
296 def progress_writer(progress_func, outfile=sys.stderr):
297     def write_progress(bytes_written, bytes_expected):
298         outfile.write(progress_func(bytes_written, bytes_expected))
299     return write_progress
300
301 def main(arguments=None):
302     ResumeCache.setup_user_cache()
303     args = parse_arguments(arguments)
304
305     if args.progress:
306         reporter = progress_writer(human_progress)
307     elif args.batch_progress:
308         reporter = progress_writer(machine_progress)
309     else:
310         reporter = None
311
312     try:
313         resume_cache = ResumeCache(ResumeCache.make_path(args))
314         if not args.resume:
315             resume_cache.restart()
316     except ResumeCacheConflict:
317         print "arv-put: Another process is already uploading this data."
318         sys.exit(1)
319
320     writer = ArvPutCollectionWriter.from_cache(
321         resume_cache, reporter, expected_bytes_for(args.paths))
322
323     # Copy file data to Keep.
324     for path in args.paths:
325         if os.path.isdir(path):
326             writer.write_directory_tree(
327                 path, max_manifest_depth=args.max_manifest_depth)
328         else:
329             writer.start_new_stream()
330             writer.write_file(path, args.filename or os.path.basename(path))
331
332     if args.stream:
333         print writer.manifest_text(),
334     elif args.raw:
335         writer.finish_current_stream()
336         print ','.join(writer.data_locators())
337     else:
338         # Register the resulting collection in Arvados.
339         arvados.api().collections().create(
340             body={
341                 'uuid': writer.finish(),
342                 'manifest_text': writer.manifest_text(),
343                 },
344             ).execute()
345
346         # Print the locator (uuid) of the new collection.
347         print writer.finish()
348     resume_cache.destroy()
349
350 if __name__ == '__main__':
351     main()