705dcfd8f35b321c261eb3953b17ad4030271287
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import base64
9 import errno
10 import fcntl
11 import hashlib
12 import json
13 import os
14 import sys
15 import tempfile
16
17 def parse_arguments(arguments):
18     parser = argparse.ArgumentParser(
19         description='Copy data from the local filesystem to Keep.')
20
21     parser.add_argument('paths', metavar='path', type=str, nargs='*',
22                         help="""
23     Local file or directory. Default: read from standard input.
24     """)
25
26     parser.add_argument('--max-manifest-depth', type=int, metavar='N',
27                         default=-1, help="""
28     Maximum depth of directory tree to represent in the manifest
29     structure. A directory structure deeper than this will be represented
30     as a single stream in the manifest. If N=0, the manifest will contain
31     a single stream. Default: -1 (unlimited), i.e., exactly one manifest
32     stream per filesystem directory that contains files.
33     """)
34
35     group = parser.add_mutually_exclusive_group()
36
37     group.add_argument('--as-stream', action='store_true', dest='stream',
38                        help="""
39     Synonym for --stream.
40     """)
41
42     group.add_argument('--stream', action='store_true',
43                        help="""
44     Store the file content and display the resulting manifest on
45     stdout. Do not write the manifest to Keep or save a Collection object
46     in Arvados.
47     """)
48
49     group.add_argument('--as-manifest', action='store_true', dest='manifest',
50                        help="""
51     Synonym for --manifest.
52     """)
53
54     group.add_argument('--in-manifest', action='store_true', dest='manifest',
55                        help="""
56     Synonym for --manifest.
57     """)
58
59     group.add_argument('--manifest', action='store_true',
60                        help="""
61     Store the file data and resulting manifest in Keep, save a Collection
62     object in Arvados, and display the manifest locator (Collection uuid)
63     on stdout. This is the default behavior.
64     """)
65
66     group.add_argument('--as-raw', action='store_true', dest='raw',
67                        help="""
68     Synonym for --raw.
69     """)
70
71     group.add_argument('--raw', action='store_true',
72                        help="""
73     Store the file content and display the data block locators on stdout,
74     separated by commas, with a trailing newline. Do not store a
75     manifest.
76     """)
77
78     parser.add_argument('--use-filename', type=str, default=None,
79                         dest='filename', help="""
80     Synonym for --filename.
81     """)
82
83     parser.add_argument('--filename', type=str, default=None,
84                         help="""
85     Use the given filename in the manifest, instead of the name of the
86     local file. This is useful when "-" or "/dev/stdin" is given as an
87     input file. It can be used only if there is exactly one path given and
88     it is not a directory. Implies --manifest.
89     """)
90
91     group = parser.add_mutually_exclusive_group()
92     group.add_argument('--progress', action='store_true',
93                        help="""
94     Display human-readable progress on stderr (bytes and, if possible,
95     percentage of total data size). This is the default behavior when
96     stderr is a tty.
97     """)
98
99     group.add_argument('--no-progress', action='store_true',
100                        help="""
101     Do not display human-readable progress on stderr, even if stderr is a
102     tty.
103     """)
104
105     group.add_argument('--batch-progress', action='store_true',
106                        help="""
107     Display machine-readable progress on stderr (bytes and, if known,
108     total data size).
109     """)
110
111     args = parser.parse_args(arguments)
112
113     if len(args.paths) == 0:
114         args.paths += ['/dev/stdin']
115
116     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
117         if args.filename:
118             parser.error("""
119     --filename argument cannot be used when storing a directory or
120     multiple files.
121     """)
122
123     # Turn on --progress by default if stderr is a tty.
124     if (not (args.batch_progress or args.no_progress)
125         and os.isatty(sys.stderr.fileno())):
126         args.progress = True
127
128     if args.paths == ['-']:
129         args.paths = ['/dev/stdin']
130         if not args.filename:
131             args.filename = '-'
132
133     return args
134
135 class ResumeCacheConflict(Exception):
136     pass
137
138
139 class ResumeCache(object):
140     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
141
142     def __init__(self, file_spec):
143         try:
144             self.cache_file = open(file_spec, 'a+')
145         except TypeError:
146             file_spec = self.make_path(file_spec)
147             self.cache_file = open(file_spec, 'a+')
148         self._lock_file(self.cache_file)
149         self.filename = self.cache_file.name
150
151     @classmethod
152     def make_path(cls, args):
153         md5 = hashlib.md5()
154         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
155         realpaths = sorted(os.path.realpath(path) for path in args.paths)
156         md5.update(''.join(realpaths))
157         if any(os.path.isdir(path) for path in realpaths):
158             md5.update(str(max(args.max_manifest_depth, -1)))
159         elif args.filename:
160             md5.update(args.filename)
161         return os.path.join(cls.CACHE_DIR, md5.hexdigest())
162
163     def _lock_file(self, fileobj):
164         try:
165             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
166         except IOError:
167             raise ResumeCacheConflict("{} locked".format(fileobj.name))
168
169     def load(self):
170         self.cache_file.seek(0)
171         return json.load(self.cache_file)
172
173     def save(self, data):
174         try:
175             new_cache_fd, new_cache_name = tempfile.mkstemp(
176                 dir=os.path.dirname(self.filename))
177             self._lock_file(new_cache_fd)
178             new_cache = os.fdopen(new_cache_fd, 'r+')
179             json.dump(data, new_cache)
180             os.rename(new_cache_name, self.filename)
181         except (IOError, OSError, ResumeCacheConflict) as error:
182             try:
183                 os.unlink(new_cache_name)
184             except NameError:  # mkstemp failed.
185                 pass
186         else:
187             self.cache_file.close()
188             self.cache_file = new_cache
189
190     def close(self):
191         self.cache_file.close()
192
193     def destroy(self):
194         try:
195             os.unlink(self.filename)
196         except OSError as error:
197             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
198                 raise
199         self.close()
200
201
202 class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
203     def __init__(self, cache=None):
204         self.cache = cache
205         super(ResumeCacheCollectionWriter, self).__init__()
206
207     @classmethod
208     def from_cache(cls, cache):
209         try:
210             state = cache.load()
211             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
212             writer = cls.from_state(state)
213         except (TypeError, ValueError,
214                 arvados.errors.StaleWriterStateError) as error:
215             return cls(cache)
216         else:
217             writer.cache = cache
218             return writer
219
220     def checkpoint_state(self):
221         if self.cache is None:
222             return
223         state = self.dump_state()
224         # Transform attributes for serialization.
225         for attr, value in state.items():
226             if attr == '_data_buffer':
227                 state[attr] = base64.encodestring(''.join(value))
228             elif hasattr(value, 'popleft'):
229                 state[attr] = list(value)
230         self.cache.save(state)
231
232
233 class CollectionWriterWithProgress(arvados.CollectionWriter):
234     def flush_data(self, *args, **kwargs):
235         if not getattr(self, 'display_type', None):
236             return
237         if not hasattr(self, 'bytes_flushed'):
238             self.bytes_flushed = 0
239         self.bytes_flushed += self._data_buffer_len
240         super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
241         self.bytes_flushed -= self._data_buffer_len
242         if self.display_type == 'machine':
243             sys.stderr.write('%s %d: %d written %d total\n' %
244                              (sys.argv[0],
245                               os.getpid(),
246                               self.bytes_flushed,
247                               getattr(self, 'bytes_expected', -1)))
248         elif getattr(self, 'bytes_expected', 0) > 0:
249             pct = 100.0 * self.bytes_flushed / self.bytes_expected
250             sys.stderr.write('\r%dM / %dM %.1f%% ' %
251                              (self.bytes_flushed >> 20,
252                               self.bytes_expected >> 20, pct))
253         else:
254             sys.stderr.write('\r%d ' % self.bytes_flushed)
255
256     def manifest_text(self, *args, **kwargs):
257         manifest_text = (super(CollectionWriterWithProgress, self)
258                          .manifest_text(*args, **kwargs))
259         if getattr(self, 'display_type', None):
260             if self.display_type == 'human':
261                 sys.stderr.write('\n')
262             self.display_type = None
263         return manifest_text
264
265
266 def expected_bytes_for(pathlist):
267     bytesum = 0
268     for path in pathlist:
269         if os.path.isdir(path):
270             for filename in arvados.util.listdir_recursive(path):
271                 bytesum += os.path.getsize(os.path.join(path, filename))
272         elif not os.path.isfile(path):
273             return None
274         else:
275             bytesum += os.path.getsize(path)
276     return bytesum
277
278 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
279                                                             os.getpid())
280 def machine_progress(bytes_written, bytes_expected):
281     return _machine_format.format(
282         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
283
284 def human_progress(bytes_written, bytes_expected):
285     if bytes_expected:
286         return "\r{}M / {}M {:.1f}% ".format(
287             bytes_written >> 20, bytes_expected >> 20,
288             bytes_written / bytes_expected)
289     else:
290         return "\r{} ".format(bytes_written)
291
292 def main(arguments=None):
293     args = parse_arguments(arguments)
294
295     if args.progress:
296         writer = CollectionWriterWithProgress()
297         writer.display_type = 'human'
298     elif args.batch_progress:
299         writer = CollectionWriterWithProgress()
300         writer.display_type = 'machine'
301     else:
302         writer = arvados.CollectionWriter()
303
304     # Walk the given directory trees and stat files, adding up file sizes,
305     # so we can display progress as percent
306     writer.bytes_expected = expected_bytes_for(args.paths)
307     if writer.bytes_expected is None:
308         del writer.bytes_expected
309
310     # Copy file data to Keep.
311     for path in args.paths:
312         if os.path.isdir(path):
313             writer.write_directory_tree(
314                 path, max_manifest_depth=args.max_manifest_depth)
315         else:
316             writer.start_new_stream()
317             writer.write_file(path, args.filename or os.path.basename(path))
318
319     if args.stream:
320         print writer.manifest_text(),
321     elif args.raw:
322         writer.finish_current_stream()
323         print ','.join(writer.data_locators())
324     else:
325         # Register the resulting collection in Arvados.
326         arvados.api().collections().create(
327             body={
328                 'uuid': writer.finish(),
329                 'manifest_text': writer.manifest_text(),
330                 },
331             ).execute()
332
333         # Print the locator (uuid) of the new collection.
334         print writer.finish()
335
336 if __name__ == '__main__':
337     main()