Merge branch '19070-update-workflow-deps' refs #19070
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import argparse
7 import hashlib
8 import os
9 import re
10 import string
11 import sys
12 import logging
13
14 import arvados
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
17
18 from arvados._version import __version__
19
20 api_client = None
21 logger = logging.getLogger('arvados.arv-get')
22
23 parser = argparse.ArgumentParser(
24     description='Copy data from Keep to a local file or pipe.',
25     parents=[arv_cmd.retry_opt])
26 parser.add_argument('--version', action='version',
27                     version="%s %s" % (sys.argv[0], __version__),
28                     help='Print version and exit.')
29 parser.add_argument('locator', type=str,
30                     help="""
31 Collection locator, optionally with a file path or prefix.
32 """)
33 parser.add_argument('destination', type=str, nargs='?', default='-',
34                     help="""
35 Local file or directory where the data is to be written. Default: stdout.
36 """)
37 group = parser.add_mutually_exclusive_group()
38 group.add_argument('--progress', action='store_true',
39                    help="""
40 Display human-readable progress on stderr (bytes and, if possible,
41 percentage of total data size). This is the default behavior when it
42 is not expected to interfere with the output: specifically, stderr is
43 a tty _and_ either stdout is not a tty, or output is being written to
44 named files rather than stdout.
45 """)
46 group.add_argument('--no-progress', action='store_true',
47                    help="""
48 Do not display human-readable progress on stderr.
49 """)
50 group.add_argument('--batch-progress', action='store_true',
51                    help="""
52 Display machine-readable progress on stderr (bytes and, if known,
53 total data size).
54 """)
55 group = parser.add_mutually_exclusive_group()
56 group.add_argument('--hash',
57                     help="""
58 Display the hash of each file as it is read from Keep, using the given
59 hash algorithm. Supported algorithms include md5, sha1, sha224,
60 sha256, sha384, and sha512.
61 """)
62 group.add_argument('--md5sum', action='store_const',
63                     dest='hash', const='md5',
64                     help="""
65 Display the MD5 hash of each file as it is read from Keep.
66 """)
67 parser.add_argument('-n', action='store_true',
68                     help="""
69 Do not write any data -- just read from Keep, and report md5sums if
70 requested.
71 """)
72 parser.add_argument('-r', action='store_true',
73                     help="""
74 Retrieve all files in the specified collection/prefix. This is the
75 default behavior if the "locator" argument ends with a forward slash.
76 """)
77 group = parser.add_mutually_exclusive_group()
78 group.add_argument('-f', action='store_true',
79                    help="""
80 Overwrite existing files while writing. The default behavior is to
81 refuse to write *anything* if any of the output files already
82 exist. As a special case, -f is not needed to write to stdout.
83 """)
84 group.add_argument('-v', action='count', default=0,
85                     help="""
86 Once for verbose mode, twice for debug mode.
87 """)
88 group.add_argument('--skip-existing', action='store_true',
89                    help="""
90 Skip files that already exist. The default behavior is to refuse to
91 write *anything* if any files exist that would have to be
92 overwritten. This option causes even devices, sockets, and fifos to be
93 skipped.
94 """)
95 group.add_argument('--strip-manifest', action='store_true', default=False,
96                    help="""
97 When getting a collection manifest, strip its access tokens before writing
98 it.
99 """)
100
101 parser.add_argument('--threads', type=int, metavar='N', default=4,
102                     help="""
103 Set the number of download threads to be used. Take into account that
104 using lots of threads will increase the RAM requirements. Default is
105 to use 4 threads.
106 On high latency installations, using a greater number will improve
107 overall throughput.
108 """)
109
110 def parse_arguments(arguments, stdout, stderr):
111     args = parser.parse_args(arguments)
112
113     if args.locator[-1] == os.sep:
114         args.r = True
115     if (args.r and
116         not args.n and
117         not (args.destination and
118              os.path.isdir(args.destination))):
119         parser.error('Destination is not a directory.')
120     if not args.r and (os.path.isdir(args.destination) or
121                        args.destination[-1] == os.path.sep):
122         args.destination = os.path.join(args.destination,
123                                         os.path.basename(args.locator))
124         logger.debug("Appended source file name to destination directory: %s",
125                      args.destination)
126
127     if args.destination == '/dev/stdout':
128         args.destination = "-"
129
130     if args.destination == '-':
131         # Normally you have to use -f to write to a file (or device) that
132         # already exists, but "-" and "/dev/stdout" are common enough to
133         # merit a special exception.
134         args.f = True
135     else:
136         args.destination = args.destination.rstrip(os.sep)
137
138     # Turn on --progress by default if stderr is a tty and output is
139     # either going to a named file, or going (via stdout) to something
140     # that isn't a tty.
141     if (not (args.batch_progress or args.no_progress)
142         and stderr.isatty()
143         and (args.destination != '-'
144              or not stdout.isatty())):
145         args.progress = True
146     return args
147
148 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
149     global api_client
150
151     if stdout is sys.stdout and hasattr(stdout, 'buffer'):
152         # in Python 3, write to stdout as binary
153         stdout = stdout.buffer
154
155     args = parse_arguments(arguments, stdout, stderr)
156     logger.setLevel(logging.WARNING - 10 * args.v)
157
158     request_id = arvados.util.new_request_id()
159     logger.info('X-Request-Id: '+request_id)
160
161     if api_client is None:
162         api_client = arvados.api('v1', request_id=request_id)
163
164     r = re.search(r'^(.*?)(/.*)?$', args.locator)
165     col_loc = r.group(1)
166     get_prefix = r.group(2)
167     if args.r and not get_prefix:
168         get_prefix = os.sep
169
170     # User asked to download the collection's manifest
171     if not get_prefix:
172         if not args.n:
173             open_flags = os.O_CREAT | os.O_WRONLY
174             if not args.f:
175                 open_flags |= os.O_EXCL
176             try:
177                 if args.destination == "-":
178                     write_block_or_manifest(
179                         dest=stdout, src=col_loc,
180                         api_client=api_client, args=args)
181                 else:
182                     out_fd = os.open(args.destination, open_flags)
183                     with os.fdopen(out_fd, 'wb') as out_file:
184                         write_block_or_manifest(
185                             dest=out_file, src=col_loc,
186                             api_client=api_client, args=args)
187             except (IOError, OSError) as error:
188                 logger.error("can't write to '{}': {}".format(args.destination, error))
189                 return 1
190             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
191                 logger.error("failed to download '{}': {}".format(col_loc, error))
192                 return 1
193             except arvados.errors.ArgumentError as error:
194                 if 'Argument to CollectionReader' in str(error):
195                     logger.error("error reading collection: {}".format(error))
196                     return 1
197                 else:
198                     raise
199         return 0
200
201     try:
202         reader = arvados.CollectionReader(
203             col_loc, api_client=api_client, num_retries=args.retries,
204             keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
205             get_threads=args.threads)
206     except Exception as error:
207         logger.error("failed to read collection: {}".format(error))
208         return 1
209
210     # Scan the collection. Make an array of (stream, file, local
211     # destination filename) tuples, and add up total size to extract.
212     todo = []
213     todo_bytes = 0
214     try:
215         if get_prefix == os.sep:
216             item = reader
217         else:
218             item = reader.find('.' + get_prefix)
219
220         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
221             # If the user asked for a file and we got a subcollection, error out.
222             if get_prefix[-1] != os.sep:
223                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
224                 return 1
225             # If the user asked stdout as a destination, error out.
226             elif args.destination == '-':
227                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
228                 return 1
229             # User asked for a subcollection, and that's what was found. Add up total size
230             # to download.
231             for s, f in files_in_collection(item):
232                 dest_path = os.path.join(
233                     args.destination,
234                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
235                 if (not (args.n or args.f or args.skip_existing) and
236                     os.path.exists(dest_path)):
237                     logger.error('Local file %s already exists.' % (dest_path,))
238                     return 1
239                 todo += [(s, f, dest_path)]
240                 todo_bytes += f.size()
241         elif isinstance(item, arvados.arvfile.ArvadosFile):
242             todo += [(item.parent, item, args.destination)]
243             todo_bytes += item.size()
244         else:
245             logger.error("'{}' not found.".format('.' + get_prefix))
246             return 1
247     except (IOError, arvados.errors.NotFoundError) as e:
248         logger.error(e)
249         return 1
250
251     out_bytes = 0
252     for s, f, outfilename in todo:
253         outfile = None
254         digestor = None
255         if not args.n:
256             if outfilename == "-":
257                 outfile = stdout
258             else:
259                 if args.skip_existing and os.path.exists(outfilename):
260                     logger.debug('Local file %s exists. Skipping.', outfilename)
261                     continue
262                 elif not args.f and (os.path.isfile(outfilename) or
263                                    os.path.isdir(outfilename)):
264                     # Good thing we looked again: apparently this file wasn't
265                     # here yet when we checked earlier.
266                     logger.error('Local file %s already exists.' % (outfilename,))
267                     return 1
268                 if args.r:
269                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
270                 try:
271                     outfile = open(outfilename, 'wb')
272                 except Exception as error:
273                     logger.error('Open(%s) failed: %s' % (outfilename, error))
274                     return 1
275         if args.hash:
276             digestor = hashlib.new(args.hash)
277         try:
278             with s.open(f.name, 'rb') as file_reader:
279                 for data in file_reader.readall():
280                     if outfile:
281                         outfile.write(data)
282                     if digestor:
283                         digestor.update(data)
284                     out_bytes += len(data)
285                     if args.progress:
286                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
287                                      (out_bytes >> 20,
288                                       todo_bytes >> 20,
289                                       (100
290                                        if todo_bytes==0
291                                        else 100.0*out_bytes/todo_bytes)))
292                     elif args.batch_progress:
293                         stderr.write('%s %d read %d total %d\n' %
294                                      (sys.argv[0], os.getpid(),
295                                       out_bytes, todo_bytes))
296             if digestor:
297                 stderr.write("%s  %s/%s\n"
298                              % (digestor.hexdigest(), s.stream_name(), f.name))
299         except KeyboardInterrupt:
300             if outfile and (outfile.fileno() > 2) and not outfile.closed:
301                 os.unlink(outfile.name)
302             break
303         finally:
304             if outfile != None and outfile != stdout:
305                 outfile.close()
306
307     if args.progress:
308         stderr.write('\n')
309     return 0
310
311 def files_in_collection(c):
312     # Sort first by file type, then alphabetically by file path.
313     for i in sorted(list(c.keys()),
314                     key=lambda k: (
315                         isinstance(c[k], arvados.collection.Subcollection),
316                         k.upper())):
317         if isinstance(c[i], arvados.arvfile.ArvadosFile):
318             yield (c, c[i])
319         elif isinstance(c[i], arvados.collection.Subcollection):
320             for s, f in files_in_collection(c[i]):
321                 yield (s, f)
322
323 def write_block_or_manifest(dest, src, api_client, args):
324     if '+A' in src:
325         # block locator
326         kc = arvados.keep.KeepClient(api_client=api_client)
327         dest.write(kc.get(src, num_retries=args.retries))
328     else:
329         # collection UUID or portable data hash
330         reader = arvados.CollectionReader(
331             src, api_client=api_client, num_retries=args.retries)
332         dest.write(reader.manifest_text(strip=args.strip_manifest).encode())