2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
18 from arvados._version import __version__
20 logger = logging.getLogger('arvados.arv-get')
22 parser = argparse.ArgumentParser(
23 description='Copy data from Keep to a local file or pipe.',
24 parents=[arv_cmd.retry_opt])
25 parser.add_argument('--version', action='version',
26 version="%s %s" % (sys.argv[0], __version__),
27 help='Print version and exit.')
28 parser.add_argument('locator', type=str,
30 Collection locator, optionally with a file path or prefix.
32 parser.add_argument('destination', type=str, nargs='?', default='-',
34 Local file or directory where the data is to be written. Default: stdout.
36 group = parser.add_mutually_exclusive_group()
37 group.add_argument('--progress', action='store_true',
39 Display human-readable progress on stderr (bytes and, if possible,
40 percentage of total data size). This is the default behavior when it
41 is not expected to interfere with the output: specifically, stderr is
42 a tty _and_ either stdout is not a tty, or output is being written to
43 named files rather than stdout.
45 group.add_argument('--no-progress', action='store_true',
47 Do not display human-readable progress on stderr.
49 group.add_argument('--batch-progress', action='store_true',
51 Display machine-readable progress on stderr (bytes and, if known,
54 group = parser.add_mutually_exclusive_group()
55 group.add_argument('--hash',
57 Display the hash of each file as it is read from Keep, using the given
58 hash algorithm. Supported algorithms include md5, sha1, sha224,
59 sha256, sha384, and sha512.
61 group.add_argument('--md5sum', action='store_const',
62 dest='hash', const='md5',
64 Display the MD5 hash of each file as it is read from Keep.
66 parser.add_argument('-n', action='store_true',
68 Do not write any data -- just read from Keep, and report md5sums if
71 parser.add_argument('-r', action='store_true',
73 Retrieve all files in the specified collection/prefix. This is the
74 default behavior if the "locator" argument ends with a forward slash.
76 group = parser.add_mutually_exclusive_group()
77 group.add_argument('-f', action='store_true',
79 Overwrite existing files while writing. The default behavior is to
80 refuse to write *anything* if any of the output files already
81 exist. As a special case, -f is not needed to write to stdout.
83 group.add_argument('-v', action='count', default=0,
85 Once for verbose mode, twice for debug mode.
87 group.add_argument('--skip-existing', action='store_true',
89 Skip files that already exist. The default behavior is to refuse to
90 write *anything* if any files exist that would have to be
91 overwritten. This option causes even devices, sockets, and fifos to be
94 group.add_argument('--strip-manifest', action='store_true', default=False,
96 When getting a collection manifest, strip its access tokens before writing
100 parser.add_argument('--threads', type=int, metavar='N', default=4,
102 Set the number of download threads to be used. Take into account that
103 using lots of threads will increase the RAM requirements. Default is
105 On high latency installations, using a greater number will improve
109 def parse_arguments(arguments, stdout, stderr):
110 args = parser.parse_args(arguments)
112 if args.locator[-1] == os.sep:
116 not (args.destination and
117 os.path.isdir(args.destination))):
118 parser.error('Destination is not a directory.')
119 if not args.r and (os.path.isdir(args.destination) or
120 args.destination[-1] == os.path.sep):
121 args.destination = os.path.join(args.destination,
122 os.path.basename(args.locator))
123 logger.debug("Appended source file name to destination directory: %s",
126 if args.destination == '/dev/stdout':
127 args.destination = "-"
129 if args.destination == '-':
130 # Normally you have to use -f to write to a file (or device) that
131 # already exists, but "-" and "/dev/stdout" are common enough to
132 # merit a special exception.
135 args.destination = args.destination.rstrip(os.sep)
137 # Turn on --progress by default if stderr is a tty and output is
138 # either going to a named file, or going (via stdout) to something
140 if (not (args.batch_progress or args.no_progress)
142 and (args.destination != '-'
143 or not stdout.isatty())):
147 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
148 if stdout is sys.stdout and hasattr(stdout, 'buffer'):
149 # in Python 3, write to stdout as binary
150 stdout = stdout.buffer
152 args = parse_arguments(arguments, stdout, stderr)
153 logger.setLevel(logging.WARNING - 10 * args.v)
155 request_id = arvados.util.new_request_id()
156 logger.info('X-Request-Id: '+request_id)
158 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
160 r = re.search(r'^(.*?)(/.*)?$', args.locator)
162 get_prefix = r.group(2)
163 if args.r and not get_prefix:
166 # User asked to download the collection's manifest
169 open_flags = os.O_CREAT | os.O_WRONLY
171 open_flags |= os.O_EXCL
173 if args.destination == "-":
174 write_block_or_manifest(
175 dest=stdout, src=col_loc,
176 api_client=api_client, args=args)
178 out_fd = os.open(args.destination, open_flags)
179 with os.fdopen(out_fd, 'wb') as out_file:
180 write_block_or_manifest(
181 dest=out_file, src=col_loc,
182 api_client=api_client, args=args)
183 except (IOError, OSError) as error:
184 logger.error("can't write to '{}': {}".format(args.destination, error))
186 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
187 logger.error("failed to download '{}': {}".format(col_loc, error))
189 except arvados.errors.ArgumentError as error:
190 if 'Argument to CollectionReader' in str(error):
191 logger.error("error reading collection: {}".format(error))
198 reader = arvados.CollectionReader(
199 col_loc, api_client=api_client, num_retries=args.retries,
200 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
201 get_threads=args.threads)
202 except Exception as error:
203 logger.error("failed to read collection: {}".format(error))
206 # Scan the collection. Make an array of (stream, file, local
207 # destination filename) tuples, and add up total size to extract.
211 if get_prefix == os.sep:
214 item = reader.find('.' + get_prefix)
216 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
217 # If the user asked for a file and we got a subcollection, error out.
218 if get_prefix[-1] != os.sep:
219 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
221 # If the user asked stdout as a destination, error out.
222 elif args.destination == '-':
223 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
225 # User asked for a subcollection, and that's what was found. Add up total size
227 for s, f in files_in_collection(item):
228 dest_path = os.path.join(
230 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
231 if (not (args.n or args.f or args.skip_existing) and
232 os.path.exists(dest_path)):
233 logger.error('Local file %s already exists.' % (dest_path,))
235 todo += [(s, f, dest_path)]
236 todo_bytes += f.size()
237 elif isinstance(item, arvados.arvfile.ArvadosFile):
238 todo += [(item.parent, item, args.destination)]
239 todo_bytes += item.size()
241 logger.error("'{}' not found.".format('.' + get_prefix))
243 except (IOError, arvados.errors.NotFoundError) as e:
248 for s, f, outfilename in todo:
252 if outfilename == "-":
255 if args.skip_existing and os.path.exists(outfilename):
256 logger.debug('Local file %s exists. Skipping.', outfilename)
258 elif not args.f and (os.path.isfile(outfilename) or
259 os.path.isdir(outfilename)):
260 # Good thing we looked again: apparently this file wasn't
261 # here yet when we checked earlier.
262 logger.error('Local file %s already exists.' % (outfilename,))
265 arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
267 outfile = open(outfilename, 'wb')
268 except Exception as error:
269 logger.error('Open(%s) failed: %s' % (outfilename, error))
272 digestor = hashlib.new(args.hash)
274 with s.open(f.name, 'rb') as file_reader:
275 for data in file_reader.readall():
279 digestor.update(data)
280 out_bytes += len(data)
282 stderr.write('\r%d MiB / %d MiB %.1f%%' %
287 else 100.0*out_bytes/todo_bytes)))
288 elif args.batch_progress:
289 stderr.write('%s %d read %d total %d\n' %
290 (sys.argv[0], os.getpid(),
291 out_bytes, todo_bytes))
293 stderr.write("%s %s/%s\n"
294 % (digestor.hexdigest(), s.stream_name(), f.name))
295 except KeyboardInterrupt:
296 if outfile and (outfile.fileno() > 2) and not outfile.closed:
297 os.unlink(outfile.name)
300 if outfile != None and outfile != stdout:
307 def files_in_collection(c):
308 # Sort first by file type, then alphabetically by file path.
309 for i in sorted(list(c.keys()),
311 isinstance(c[k], arvados.collection.Subcollection),
313 if isinstance(c[i], arvados.arvfile.ArvadosFile):
315 elif isinstance(c[i], arvados.collection.Subcollection):
316 for s, f in files_in_collection(c[i]):
319 def write_block_or_manifest(dest, src, api_client, args):
322 kc = arvados.keep.KeepClient(api_client=api_client)
323 dest.write(kc.get(src, num_retries=args.retries))
325 # collection UUID or portable data hash
326 reader = arvados.CollectionReader(
327 src, api_client=api_client, num_retries=args.retries)
328 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())