20637: Fix arv-get
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import argparse
7 import hashlib
8 import os
9 import re
10 import string
11 import sys
12 import logging
13
14 import arvados
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
17
18 from arvados._version import __version__
19
20 logger = logging.getLogger('arvados.arv-get')
21
22 parser = argparse.ArgumentParser(
23     description='Copy data from Keep to a local file or pipe.',
24     parents=[arv_cmd.retry_opt])
25 parser.add_argument('--version', action='version',
26                     version="%s %s" % (sys.argv[0], __version__),
27                     help='Print version and exit.')
28 parser.add_argument('locator', type=str,
29                     help="""
30 Collection locator, optionally with a file path or prefix.
31 """)
32 parser.add_argument('destination', type=str, nargs='?', default='-',
33                     help="""
34 Local file or directory where the data is to be written. Default: stdout.
35 """)
36 group = parser.add_mutually_exclusive_group()
37 group.add_argument('--progress', action='store_true',
38                    help="""
39 Display human-readable progress on stderr (bytes and, if possible,
40 percentage of total data size). This is the default behavior when it
41 is not expected to interfere with the output: specifically, stderr is
42 a tty _and_ either stdout is not a tty, or output is being written to
43 named files rather than stdout.
44 """)
45 group.add_argument('--no-progress', action='store_true',
46                    help="""
47 Do not display human-readable progress on stderr.
48 """)
49 group.add_argument('--batch-progress', action='store_true',
50                    help="""
51 Display machine-readable progress on stderr (bytes and, if known,
52 total data size).
53 """)
54 group = parser.add_mutually_exclusive_group()
55 group.add_argument('--hash',
56                     help="""
57 Display the hash of each file as it is read from Keep, using the given
58 hash algorithm. Supported algorithms include md5, sha1, sha224,
59 sha256, sha384, and sha512.
60 """)
61 group.add_argument('--md5sum', action='store_const',
62                     dest='hash', const='md5',
63                     help="""
64 Display the MD5 hash of each file as it is read from Keep.
65 """)
66 parser.add_argument('-n', action='store_true',
67                     help="""
68 Do not write any data -- just read from Keep, and report md5sums if
69 requested.
70 """)
71 parser.add_argument('-r', action='store_true',
72                     help="""
73 Retrieve all files in the specified collection/prefix. This is the
74 default behavior if the "locator" argument ends with a forward slash.
75 """)
76 group = parser.add_mutually_exclusive_group()
77 group.add_argument('-f', action='store_true',
78                    help="""
79 Overwrite existing files while writing. The default behavior is to
80 refuse to write *anything* if any of the output files already
81 exist. As a special case, -f is not needed to write to stdout.
82 """)
83 group.add_argument('-v', action='count', default=0,
84                     help="""
85 Once for verbose mode, twice for debug mode.
86 """)
87 group.add_argument('--skip-existing', action='store_true',
88                    help="""
89 Skip files that already exist. The default behavior is to refuse to
90 write *anything* if any files exist that would have to be
91 overwritten. This option causes even devices, sockets, and fifos to be
92 skipped.
93 """)
94 group.add_argument('--strip-manifest', action='store_true', default=False,
95                    help="""
96 When getting a collection manifest, strip its access tokens before writing
97 it.
98 """)
99
100 parser.add_argument('--threads', type=int, metavar='N', default=4,
101                     help="""
102 Set the number of download threads to be used. Take into account that
103 using lots of threads will increase the RAM requirements. Default is
104 to use 4 threads.
105 On high latency installations, using a greater number will improve
106 overall throughput.
107 """)
108
109 def parse_arguments(arguments, stdout, stderr):
110     args = parser.parse_args(arguments)
111
112     if args.locator[-1] == os.sep:
113         args.r = True
114     if (args.r and
115         not args.n and
116         not (args.destination and
117              os.path.isdir(args.destination))):
118         parser.error('Destination is not a directory.')
119     if not args.r and (os.path.isdir(args.destination) or
120                        args.destination[-1] == os.path.sep):
121         args.destination = os.path.join(args.destination,
122                                         os.path.basename(args.locator))
123         logger.debug("Appended source file name to destination directory: %s",
124                      args.destination)
125
126     if args.destination == '/dev/stdout':
127         args.destination = "-"
128
129     if args.destination == '-':
130         # Normally you have to use -f to write to a file (or device) that
131         # already exists, but "-" and "/dev/stdout" are common enough to
132         # merit a special exception.
133         args.f = True
134     else:
135         args.destination = args.destination.rstrip(os.sep)
136
137     # Turn on --progress by default if stderr is a tty and output is
138     # either going to a named file, or going (via stdout) to something
139     # that isn't a tty.
140     if (not (args.batch_progress or args.no_progress)
141         and stderr.isatty()
142         and (args.destination != '-'
143              or not stdout.isatty())):
144         args.progress = True
145     return args
146
147 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
148     if stdout is sys.stdout and hasattr(stdout, 'buffer'):
149         # in Python 3, write to stdout as binary
150         stdout = stdout.buffer
151
152     args = parse_arguments(arguments, stdout, stderr)
153     logger.setLevel(logging.WARNING - 10 * args.v)
154
155     request_id = arvados.util.new_request_id()
156     logger.info('X-Request-Id: '+request_id)
157
158     api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
159
160     r = re.search(r'^(.*?)(/.*)?$', args.locator)
161     col_loc = r.group(1)
162     get_prefix = r.group(2)
163     if args.r and not get_prefix:
164         get_prefix = os.sep
165
166     # User asked to download the collection's manifest
167     if not get_prefix:
168         if not args.n:
169             open_flags = os.O_CREAT | os.O_WRONLY
170             if not args.f:
171                 open_flags |= os.O_EXCL
172             try:
173                 if args.destination == "-":
174                     write_block_or_manifest(
175                         dest=stdout, src=col_loc,
176                         api_client=api_client, args=args)
177                 else:
178                     out_fd = os.open(args.destination, open_flags)
179                     with os.fdopen(out_fd, 'wb') as out_file:
180                         write_block_or_manifest(
181                             dest=out_file, src=col_loc,
182                             api_client=api_client, args=args)
183             except (IOError, OSError) as error:
184                 logger.error("can't write to '{}': {}".format(args.destination, error))
185                 return 1
186             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
187                 logger.error("failed to download '{}': {}".format(col_loc, error))
188                 return 1
189             except arvados.errors.ArgumentError as error:
190                 if 'Argument to CollectionReader' in str(error):
191                     logger.error("error reading collection: {}".format(error))
192                     return 1
193                 else:
194                     raise
195         return 0
196
197     try:
198         reader = arvados.CollectionReader(
199             col_loc, api_client=api_client, num_retries=args.retries,
200             keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
201     except Exception as error:
202         logger.error("failed to read collection: {}".format(error))
203         return 1
204
205     # Scan the collection. Make an array of (stream, file, local
206     # destination filename) tuples, and add up total size to extract.
207     todo = []
208     todo_bytes = 0
209     try:
210         if get_prefix == os.sep:
211             item = reader
212         else:
213             item = reader.find('.' + get_prefix)
214
215         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
216             # If the user asked for a file and we got a subcollection, error out.
217             if get_prefix[-1] != os.sep:
218                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
219                 return 1
220             # If the user asked stdout as a destination, error out.
221             elif args.destination == '-':
222                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
223                 return 1
224             # User asked for a subcollection, and that's what was found. Add up total size
225             # to download.
226             for s, f in files_in_collection(item):
227                 dest_path = os.path.join(
228                     args.destination,
229                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
230                 if (not (args.n or args.f or args.skip_existing) and
231                     os.path.exists(dest_path)):
232                     logger.error('Local file %s already exists.' % (dest_path,))
233                     return 1
234                 todo += [(s, f, dest_path)]
235                 todo_bytes += f.size()
236         elif isinstance(item, arvados.arvfile.ArvadosFile):
237             todo += [(item.parent, item, args.destination)]
238             todo_bytes += item.size()
239         else:
240             logger.error("'{}' not found.".format('.' + get_prefix))
241             return 1
242     except (IOError, arvados.errors.NotFoundError) as e:
243         logger.error(e)
244         return 1
245
246     out_bytes = 0
247     for s, f, outfilename in todo:
248         outfile = None
249         digestor = None
250         if not args.n:
251             if outfilename == "-":
252                 outfile = stdout
253             else:
254                 if args.skip_existing and os.path.exists(outfilename):
255                     logger.debug('Local file %s exists. Skipping.', outfilename)
256                     continue
257                 elif not args.f and (os.path.isfile(outfilename) or
258                                    os.path.isdir(outfilename)):
259                     # Good thing we looked again: apparently this file wasn't
260                     # here yet when we checked earlier.
261                     logger.error('Local file %s already exists.' % (outfilename,))
262                     return 1
263                 if args.r:
264                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
265                 try:
266                     outfile = open(outfilename, 'wb')
267                 except Exception as error:
268                     logger.error('Open(%s) failed: %s' % (outfilename, error))
269                     return 1
270         if args.hash:
271             digestor = hashlib.new(args.hash)
272         try:
273             with s.open(f.name, 'rb') as file_reader:
274                 for data in file_reader.readall():
275                     if outfile:
276                         outfile.write(data)
277                     if digestor:
278                         digestor.update(data)
279                     out_bytes += len(data)
280                     if args.progress:
281                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
282                                      (out_bytes >> 20,
283                                       todo_bytes >> 20,
284                                       (100
285                                        if todo_bytes==0
286                                        else 100.0*out_bytes/todo_bytes)))
287                     elif args.batch_progress:
288                         stderr.write('%s %d read %d total %d\n' %
289                                      (sys.argv[0], os.getpid(),
290                                       out_bytes, todo_bytes))
291             if digestor:
292                 stderr.write("%s  %s/%s\n"
293                              % (digestor.hexdigest(), s.stream_name(), f.name))
294         except KeyboardInterrupt:
295             if outfile and (outfile.fileno() > 2) and not outfile.closed:
296                 os.unlink(outfile.name)
297             break
298         finally:
299             if outfile != None and outfile != stdout:
300                 outfile.close()
301
302     if args.progress:
303         stderr.write('\n')
304     return 0
305
306 def files_in_collection(c):
307     # Sort first by file type, then alphabetically by file path.
308     for i in sorted(list(c.keys()),
309                     key=lambda k: (
310                         isinstance(c[k], arvados.collection.Subcollection),
311                         k.upper())):
312         if isinstance(c[i], arvados.arvfile.ArvadosFile):
313             yield (c, c[i])
314         elif isinstance(c[i], arvados.collection.Subcollection):
315             for s, f in files_in_collection(c[i]):
316                 yield (s, f)
317
318 def write_block_or_manifest(dest, src, api_client, args):
319     if '+A' in src:
320         # block locator
321         kc = arvados.keep.KeepClient(api_client=api_client)
322         dest.write(kc.get(src, num_retries=args.retries))
323     else:
324         # collection UUID or portable data hash
325         reader = arvados.CollectionReader(
326             src, api_client=api_client, num_retries=args.retries)
327         dest.write(reader.manifest_text(strip=args.strip_manifest).encode())