17004: Fix lingering resource error
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import argparse
7 import hashlib
8 import os
9 import re
10 import string
11 import sys
12 import logging
13
14 import arvados
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
17
18 from arvados._version import __version__
19
20 logger = logging.getLogger('arvados.arv-get')
21
22 parser = argparse.ArgumentParser(
23     description='Copy data from Keep to a local file or pipe.',
24     parents=[arv_cmd.retry_opt])
25 parser.add_argument('--version', action='version',
26                     version="%s %s" % (sys.argv[0], __version__),
27                     help='Print version and exit.')
28 parser.add_argument('locator', type=str,
29                     help="""
30 Collection locator, optionally with a file path or prefix.
31 """)
32 parser.add_argument('destination', type=str, nargs='?', default='-',
33                     help="""
34 Local file or directory where the data is to be written. Default: stdout.
35 """)
36 group = parser.add_mutually_exclusive_group()
37 group.add_argument('--progress', action='store_true',
38                    help="""
39 Display human-readable progress on stderr (bytes and, if possible,
40 percentage of total data size). This is the default behavior when it
41 is not expected to interfere with the output: specifically, stderr is
42 a tty _and_ either stdout is not a tty, or output is being written to
43 named files rather than stdout.
44 """)
45 group.add_argument('--no-progress', action='store_true',
46                    help="""
47 Do not display human-readable progress on stderr.
48 """)
49 group.add_argument('--batch-progress', action='store_true',
50                    help="""
51 Display machine-readable progress on stderr (bytes and, if known,
52 total data size).
53 """)
54 group = parser.add_mutually_exclusive_group()
55 group.add_argument('--hash',
56                     help="""
57 Display the hash of each file as it is read from Keep, using the given
58 hash algorithm. Supported algorithms include md5, sha1, sha224,
59 sha256, sha384, and sha512.
60 """)
61 group.add_argument('--md5sum', action='store_const',
62                     dest='hash', const='md5',
63                     help="""
64 Display the MD5 hash of each file as it is read from Keep.
65 """)
66 parser.add_argument('-n', action='store_true',
67                     help="""
68 Do not write any data -- just read from Keep, and report md5sums if
69 requested.
70 """)
71 parser.add_argument('-r', action='store_true',
72                     help="""
73 Retrieve all files in the specified collection/prefix. This is the
74 default behavior if the "locator" argument ends with a forward slash.
75 """)
76 group = parser.add_mutually_exclusive_group()
77 group.add_argument('-f', action='store_true',
78                    help="""
79 Overwrite existing files while writing. The default behavior is to
80 refuse to write *anything* if any of the output files already
81 exist. As a special case, -f is not needed to write to stdout.
82 """)
83 group.add_argument('-v', action='count', default=0,
84                     help="""
85 Once for verbose mode, twice for debug mode.
86 """)
87 group.add_argument('--skip-existing', action='store_true',
88                    help="""
89 Skip files that already exist. The default behavior is to refuse to
90 write *anything* if any files exist that would have to be
91 overwritten. This option causes even devices, sockets, and fifos to be
92 skipped.
93 """)
94 group.add_argument('--strip-manifest', action='store_true', default=False,
95                    help="""
96 When getting a collection manifest, strip its access tokens before writing
97 it.
98 """)
99
100 parser.add_argument('--threads', type=int, metavar='N', default=4,
101                     help="""
102 Set the number of download threads to be used. Take into account that
103 using lots of threads will increase the RAM requirements. Default is
104 to use 4 threads.
105 On high latency installations, using a greater number will improve
106 overall throughput.
107 """)
108
109 def parse_arguments(arguments, stdout, stderr):
110     args = parser.parse_args(arguments)
111
112     if args.locator[-1] == os.sep:
113         args.r = True
114     if (args.r and
115         not args.n and
116         not (args.destination and
117              os.path.isdir(args.destination))):
118         parser.error('Destination is not a directory.')
119     if not args.r and (os.path.isdir(args.destination) or
120                        args.destination[-1] == os.path.sep):
121         args.destination = os.path.join(args.destination,
122                                         os.path.basename(args.locator))
123         logger.debug("Appended source file name to destination directory: %s",
124                      args.destination)
125
126     if args.destination == '/dev/stdout':
127         args.destination = "-"
128
129     if args.destination == '-':
130         # Normally you have to use -f to write to a file (or device) that
131         # already exists, but "-" and "/dev/stdout" are common enough to
132         # merit a special exception.
133         args.f = True
134     else:
135         args.destination = args.destination.rstrip(os.sep)
136
137     # Turn on --progress by default if stderr is a tty and output is
138     # either going to a named file, or going (via stdout) to something
139     # that isn't a tty.
140     if (not (args.batch_progress or args.no_progress)
141         and stderr.isatty()
142         and (args.destination != '-'
143              or not stdout.isatty())):
144         args.progress = True
145     return args
146
147 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
148     if stdout is sys.stdout and hasattr(stdout, 'buffer'):
149         # in Python 3, write to stdout as binary
150         stdout = stdout.buffer
151
152     args = parse_arguments(arguments, stdout, stderr)
153     logger.setLevel(logging.WARNING - 10 * args.v)
154
155     request_id = arvados.util.new_request_id()
156     logger.info('X-Request-Id: '+request_id)
157
158     api_client = arvados.api('v1', request_id=request_id)
159
160     r = re.search(r'^(.*?)(/.*)?$', args.locator)
161     col_loc = r.group(1)
162     get_prefix = r.group(2)
163     if args.r and not get_prefix:
164         get_prefix = os.sep
165
166     # User asked to download the collection's manifest
167     if not get_prefix:
168         if not args.n:
169             open_flags = os.O_CREAT | os.O_WRONLY
170             if not args.f:
171                 open_flags |= os.O_EXCL
172             try:
173                 if args.destination == "-":
174                     write_block_or_manifest(
175                         dest=stdout, src=col_loc,
176                         api_client=api_client, args=args)
177                 else:
178                     out_fd = os.open(args.destination, open_flags)
179                     with os.fdopen(out_fd, 'wb') as out_file:
180                         write_block_or_manifest(
181                             dest=out_file, src=col_loc,
182                             api_client=api_client, args=args)
183             except (IOError, OSError) as error:
184                 logger.error("can't write to '{}': {}".format(args.destination, error))
185                 return 1
186             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
187                 logger.error("failed to download '{}': {}".format(col_loc, error))
188                 return 1
189             except arvados.errors.ArgumentError as error:
190                 if 'Argument to CollectionReader' in str(error):
191                     logger.error("error reading collection: {}".format(error))
192                     return 1
193                 else:
194                     raise
195         return 0
196
197     try:
198         reader = arvados.CollectionReader(
199             col_loc, api_client=api_client, num_retries=args.retries,
200             keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
201             get_threads=args.threads)
202     except Exception as error:
203         logger.error("failed to read collection: {}".format(error))
204         return 1
205
206     # Scan the collection. Make an array of (stream, file, local
207     # destination filename) tuples, and add up total size to extract.
208     todo = []
209     todo_bytes = 0
210     try:
211         if get_prefix == os.sep:
212             item = reader
213         else:
214             item = reader.find('.' + get_prefix)
215
216         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
217             # If the user asked for a file and we got a subcollection, error out.
218             if get_prefix[-1] != os.sep:
219                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
220                 return 1
221             # If the user asked stdout as a destination, error out.
222             elif args.destination == '-':
223                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
224                 return 1
225             # User asked for a subcollection, and that's what was found. Add up total size
226             # to download.
227             for s, f in files_in_collection(item):
228                 dest_path = os.path.join(
229                     args.destination,
230                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
231                 if (not (args.n or args.f or args.skip_existing) and
232                     os.path.exists(dest_path)):
233                     logger.error('Local file %s already exists.' % (dest_path,))
234                     return 1
235                 todo += [(s, f, dest_path)]
236                 todo_bytes += f.size()
237         elif isinstance(item, arvados.arvfile.ArvadosFile):
238             todo += [(item.parent, item, args.destination)]
239             todo_bytes += item.size()
240         else:
241             logger.error("'{}' not found.".format('.' + get_prefix))
242             return 1
243     except (IOError, arvados.errors.NotFoundError) as e:
244         logger.error(e)
245         return 1
246
247     out_bytes = 0
248     for s, f, outfilename in todo:
249         outfile = None
250         digestor = None
251         if not args.n:
252             if outfilename == "-":
253                 outfile = stdout
254             else:
255                 if args.skip_existing and os.path.exists(outfilename):
256                     logger.debug('Local file %s exists. Skipping.', outfilename)
257                     continue
258                 elif not args.f and (os.path.isfile(outfilename) or
259                                    os.path.isdir(outfilename)):
260                     # Good thing we looked again: apparently this file wasn't
261                     # here yet when we checked earlier.
262                     logger.error('Local file %s already exists.' % (outfilename,))
263                     return 1
264                 if args.r:
265                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
266                 try:
267                     outfile = open(outfilename, 'wb')
268                 except Exception as error:
269                     logger.error('Open(%s) failed: %s' % (outfilename, error))
270                     return 1
271         if args.hash:
272             digestor = hashlib.new(args.hash)
273         try:
274             with s.open(f.name, 'rb') as file_reader:
275                 for data in file_reader.readall():
276                     if outfile:
277                         outfile.write(data)
278                     if digestor:
279                         digestor.update(data)
280                     out_bytes += len(data)
281                     if args.progress:
282                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
283                                      (out_bytes >> 20,
284                                       todo_bytes >> 20,
285                                       (100
286                                        if todo_bytes==0
287                                        else 100.0*out_bytes/todo_bytes)))
288                     elif args.batch_progress:
289                         stderr.write('%s %d read %d total %d\n' %
290                                      (sys.argv[0], os.getpid(),
291                                       out_bytes, todo_bytes))
292             if digestor:
293                 stderr.write("%s  %s/%s\n"
294                              % (digestor.hexdigest(), s.stream_name(), f.name))
295         except KeyboardInterrupt:
296             if outfile and (outfile.fileno() > 2) and not outfile.closed:
297                 os.unlink(outfile.name)
298             break
299         finally:
300             if outfile != None and outfile != stdout:
301                 outfile.close()
302
303     if args.progress:
304         stderr.write('\n')
305     return 0
306
307 def files_in_collection(c):
308     # Sort first by file type, then alphabetically by file path.
309     for i in sorted(list(c.keys()),
310                     key=lambda k: (
311                         isinstance(c[k], arvados.collection.Subcollection),
312                         k.upper())):
313         if isinstance(c[i], arvados.arvfile.ArvadosFile):
314             yield (c, c[i])
315         elif isinstance(c[i], arvados.collection.Subcollection):
316             for s, f in files_in_collection(c[i]):
317                 yield (s, f)
318
319 def write_block_or_manifest(dest, src, api_client, args):
320     if '+A' in src:
321         # block locator
322         kc = arvados.keep.KeepClient(api_client=api_client)
323         dest.write(kc.get(src, num_retries=args.retries))
324     else:
325         # collection UUID or portable data hash
326         reader = arvados.CollectionReader(
327             src, api_client=api_client, num_retries=args.retries)
328         dest.write(reader.manifest_text(strip=args.strip_manifest).encode())