20933: Code cleanups
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import argparse
7 import hashlib
8 import os
9 import pathlib
10 import re
11 import string
12 import sys
13 import logging
14
15 import arvados
16 import arvados.commands._util as arv_cmd
17 import arvados.util as util
18
19 from arvados._version import __version__
20
21 logger = logging.getLogger('arvados.arv-get')
22
23 parser = argparse.ArgumentParser(
24     description='Copy data from Keep to a local file or pipe.',
25     parents=[arv_cmd.retry_opt])
26 parser.add_argument('--version', action='version',
27                     version="%s %s" % (sys.argv[0], __version__),
28                     help='Print version and exit.')
29 parser.add_argument('locator', type=str,
30                     help="""
31 Collection locator, optionally with a file path or prefix.
32 """)
33 parser.add_argument('destination', type=str, nargs='?', default='-',
34                     help="""
35 Local file or directory where the data is to be written. Default: stdout.
36 """)
37 group = parser.add_mutually_exclusive_group()
38 group.add_argument('--progress', action='store_true',
39                    help="""
40 Display human-readable progress on stderr (bytes and, if possible,
41 percentage of total data size). This is the default behavior when it
42 is not expected to interfere with the output: specifically, stderr is
43 a tty _and_ either stdout is not a tty, or output is being written to
44 named files rather than stdout.
45 """)
46 group.add_argument('--no-progress', action='store_true',
47                    help="""
48 Do not display human-readable progress on stderr.
49 """)
50 group.add_argument('--batch-progress', action='store_true',
51                    help="""
52 Display machine-readable progress on stderr (bytes and, if known,
53 total data size).
54 """)
55 group = parser.add_mutually_exclusive_group()
56 group.add_argument('--hash',
57                     help="""
58 Display the hash of each file as it is read from Keep, using the given
59 hash algorithm. Supported algorithms include md5, sha1, sha224,
60 sha256, sha384, and sha512.
61 """)
62 group.add_argument('--md5sum', action='store_const',
63                     dest='hash', const='md5',
64                     help="""
65 Display the MD5 hash of each file as it is read from Keep.
66 """)
67 parser.add_argument('-n', action='store_true',
68                     help="""
69 Do not write any data -- just read from Keep, and report md5sums if
70 requested.
71 """)
72 parser.add_argument('-r', action='store_true',
73                     help="""
74 Retrieve all files in the specified collection/prefix. This is the
75 default behavior if the "locator" argument ends with a forward slash.
76 """)
77 group = parser.add_mutually_exclusive_group()
78 group.add_argument('-f', action='store_true',
79                    help="""
80 Overwrite existing files while writing. The default behavior is to
81 refuse to write *anything* if any of the output files already
82 exist. As a special case, -f is not needed to write to stdout.
83 """)
84 group.add_argument('-v', action='count', default=0,
85                     help="""
86 Once for verbose mode, twice for debug mode.
87 """)
88 group.add_argument('--skip-existing', action='store_true',
89                    help="""
90 Skip files that already exist. The default behavior is to refuse to
91 write *anything* if any files exist that would have to be
92 overwritten. This option causes even devices, sockets, and fifos to be
93 skipped.
94 """)
95 group.add_argument('--strip-manifest', action='store_true', default=False,
96                    help="""
97 When getting a collection manifest, strip its access tokens before writing
98 it.
99 """)
100
101 parser.add_argument('--threads', type=int, metavar='N', default=4,
102                     help="""
103 Set the number of download threads to be used. Take into account that
104 using lots of threads will increase the RAM requirements. Default is
105 to use 4 threads.
106 On high latency installations, using a greater number will improve
107 overall throughput.
108 """)
109
110 def parse_arguments(arguments, stdout, stderr):
111     args = parser.parse_args(arguments)
112
113     if args.locator[-1] == os.sep:
114         args.r = True
115     if (args.r and
116         not args.n and
117         not (args.destination and
118              os.path.isdir(args.destination))):
119         parser.error('Destination is not a directory.')
120     if not args.r and (os.path.isdir(args.destination) or
121                        args.destination[-1] == os.path.sep):
122         args.destination = os.path.join(args.destination,
123                                         os.path.basename(args.locator))
124         logger.debug("Appended source file name to destination directory: %s",
125                      args.destination)
126
127     if args.destination == '/dev/stdout':
128         args.destination = "-"
129
130     if args.destination == '-':
131         # Normally you have to use -f to write to a file (or device) that
132         # already exists, but "-" and "/dev/stdout" are common enough to
133         # merit a special exception.
134         args.f = True
135     else:
136         args.destination = args.destination.rstrip(os.sep)
137
138     # Turn on --progress by default if stderr is a tty and output is
139     # either going to a named file, or going (via stdout) to something
140     # that isn't a tty.
141     if (not (args.batch_progress or args.no_progress)
142         and stderr.isatty()
143         and (args.destination != '-'
144              or not stdout.isatty())):
145         args.progress = True
146     return args
147
148 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
149     if stdout is sys.stdout and hasattr(stdout, 'buffer'):
150         # in Python 3, write to stdout as binary
151         stdout = stdout.buffer
152
153     args = parse_arguments(arguments, stdout, stderr)
154     logger.setLevel(logging.WARNING - 10 * args.v)
155
156     request_id = arvados.util.new_request_id()
157     logger.info('X-Request-Id: '+request_id)
158
159     api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
160
161     r = re.search(r'^(.*?)(/.*)?$', args.locator)
162     col_loc = r.group(1)
163     get_prefix = r.group(2)
164     if args.r and not get_prefix:
165         get_prefix = os.sep
166
167     # User asked to download the collection's manifest
168     if not get_prefix:
169         if not args.n:
170             open_flags = os.O_CREAT | os.O_WRONLY
171             if not args.f:
172                 open_flags |= os.O_EXCL
173             try:
174                 if args.destination == "-":
175                     write_block_or_manifest(
176                         dest=stdout, src=col_loc,
177                         api_client=api_client, args=args)
178                 else:
179                     out_fd = os.open(args.destination, open_flags)
180                     with os.fdopen(out_fd, 'wb') as out_file:
181                         write_block_or_manifest(
182                             dest=out_file, src=col_loc,
183                             api_client=api_client, args=args)
184             except (IOError, OSError) as error:
185                 logger.error("can't write to '{}': {}".format(args.destination, error))
186                 return 1
187             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
188                 logger.error("failed to download '{}': {}".format(col_loc, error))
189                 return 1
190             except arvados.errors.ArgumentError as error:
191                 if 'Argument to CollectionReader' in str(error):
192                     logger.error("error reading collection: {}".format(error))
193                     return 1
194                 else:
195                     raise
196         return 0
197
198     try:
199         reader = arvados.CollectionReader(
200             col_loc, api_client=api_client, num_retries=args.retries,
201             keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
202     except Exception as error:
203         logger.error("failed to read collection: {}".format(error))
204         return 1
205
206     # Scan the collection. Make an array of (stream, file, local
207     # destination filename) tuples, and add up total size to extract.
208     todo = []
209     todo_bytes = 0
210     try:
211         if get_prefix == os.sep:
212             item = reader
213         else:
214             item = reader.find('.' + get_prefix)
215
216         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
217             # If the user asked for a file and we got a subcollection, error out.
218             if get_prefix[-1] != os.sep:
219                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
220                 return 1
221             # If the user asked stdout as a destination, error out.
222             elif args.destination == '-':
223                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
224                 return 1
225             # User asked for a subcollection, and that's what was found. Add up total size
226             # to download.
227             for s, f in files_in_collection(item):
228                 dest_path = os.path.join(
229                     args.destination,
230                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
231                 if (not (args.n or args.f or args.skip_existing) and
232                     os.path.exists(dest_path)):
233                     logger.error('Local file %s already exists.' % (dest_path,))
234                     return 1
235                 todo += [(s, f, dest_path)]
236                 todo_bytes += f.size()
237         elif isinstance(item, arvados.arvfile.ArvadosFile):
238             todo += [(item.parent, item, args.destination)]
239             todo_bytes += item.size()
240         else:
241             logger.error("'{}' not found.".format('.' + get_prefix))
242             return 1
243     except (IOError, arvados.errors.NotFoundError) as e:
244         logger.error(e)
245         return 1
246
247     out_bytes = 0
248     for s, f, outfilename in todo:
249         outfile = None
250         digestor = None
251         if not args.n:
252             if outfilename == "-":
253                 outfile = stdout
254             else:
255                 if args.skip_existing and os.path.exists(outfilename):
256                     logger.debug('Local file %s exists. Skipping.', outfilename)
257                     continue
258                 elif not args.f and (os.path.isfile(outfilename) or
259                                    os.path.isdir(outfilename)):
260                     # Good thing we looked again: apparently this file wasn't
261                     # here yet when we checked earlier.
262                     logger.error('Local file %s already exists.' % (outfilename,))
263                     return 1
264                 if args.r:
265                     pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True)
266                 try:
267                     outfile = open(outfilename, 'wb')
268                 except Exception as error:
269                     logger.error('Open(%s) failed: %s' % (outfilename, error))
270                     return 1
271         if args.hash:
272             digestor = hashlib.new(args.hash)
273         try:
274             with s.open(f.name, 'rb') as file_reader:
275                 for data in file_reader.readall():
276                     if outfile:
277                         outfile.write(data)
278                     if digestor:
279                         digestor.update(data)
280                     out_bytes += len(data)
281                     if args.progress:
282                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
283                                      (out_bytes >> 20,
284                                       todo_bytes >> 20,
285                                       (100
286                                        if todo_bytes==0
287                                        else 100.0*out_bytes/todo_bytes)))
288                     elif args.batch_progress:
289                         stderr.write('%s %d read %d total %d\n' %
290                                      (sys.argv[0], os.getpid(),
291                                       out_bytes, todo_bytes))
292             if digestor:
293                 stderr.write("%s  %s/%s\n"
294                              % (digestor.hexdigest(), s.stream_name(), f.name))
295         except KeyboardInterrupt:
296             if outfile and (outfile.fileno() > 2) and not outfile.closed:
297                 os.unlink(outfile.name)
298             break
299         finally:
300             if outfile != None and outfile != stdout:
301                 outfile.close()
302
303     if args.progress:
304         stderr.write('\n')
305     return 0
306
307 def files_in_collection(c):
308     # Sort first by file type, then alphabetically by file path.
309     for i in sorted(list(c.keys()),
310                     key=lambda k: (
311                         isinstance(c[k], arvados.collection.Subcollection),
312                         k.upper())):
313         if isinstance(c[i], arvados.arvfile.ArvadosFile):
314             yield (c, c[i])
315         elif isinstance(c[i], arvados.collection.Subcollection):
316             for s, f in files_in_collection(c[i]):
317                 yield (s, f)
318
319 def write_block_or_manifest(dest, src, api_client, args):
320     if '+A' in src:
321         # block locator
322         kc = arvados.keep.KeepClient(api_client=api_client)
323         dest.write(kc.get(src, num_retries=args.retries))
324     else:
325         # collection UUID or portable data hash
326         reader = arvados.CollectionReader(
327             src, api_client=api_client, num_retries=args.retries)
328         dest.write(reader.manifest_text(strip=args.strip_manifest).encode())