2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
16 import arvados.commands._util as arv_cmd
17 import arvados.util as util
19 from arvados._version import __version__
21 logger = logging.getLogger('arvados.arv-get')
23 parser = argparse.ArgumentParser(
24 description='Copy data from Keep to a local file or pipe.',
25 parents=[arv_cmd.retry_opt])
26 parser.add_argument('--version', action='version',
27 version="%s %s" % (sys.argv[0], __version__),
28 help='Print version and exit.')
29 parser.add_argument('locator', type=str,
31 Collection locator, optionally with a file path or prefix.
33 parser.add_argument('destination', type=str, nargs='?', default='-',
35 Local file or directory where the data is to be written. Default: stdout.
37 group = parser.add_mutually_exclusive_group()
38 group.add_argument('--progress', action='store_true',
40 Display human-readable progress on stderr (bytes and, if possible,
41 percentage of total data size). This is the default behavior when it
42 is not expected to interfere with the output: specifically, stderr is
43 a tty _and_ either stdout is not a tty, or output is being written to
44 named files rather than stdout.
46 group.add_argument('--no-progress', action='store_true',
48 Do not display human-readable progress on stderr.
50 group.add_argument('--batch-progress', action='store_true',
52 Display machine-readable progress on stderr (bytes and, if known,
55 group = parser.add_mutually_exclusive_group()
56 group.add_argument('--hash',
58 Display the hash of each file as it is read from Keep, using the given
59 hash algorithm. Supported algorithms include md5, sha1, sha224,
60 sha256, sha384, and sha512.
62 group.add_argument('--md5sum', action='store_const',
63 dest='hash', const='md5',
65 Display the MD5 hash of each file as it is read from Keep.
67 parser.add_argument('-n', action='store_true',
69 Do not write any data -- just read from Keep, and report md5sums if
72 parser.add_argument('-r', action='store_true',
74 Retrieve all files in the specified collection/prefix. This is the
75 default behavior if the "locator" argument ends with a forward slash.
77 group = parser.add_mutually_exclusive_group()
78 group.add_argument('-f', action='store_true',
80 Overwrite existing files while writing. The default behavior is to
81 refuse to write *anything* if any of the output files already
82 exist. As a special case, -f is not needed to write to stdout.
84 group.add_argument('-v', action='count', default=0,
86 Once for verbose mode, twice for debug mode.
88 group.add_argument('--skip-existing', action='store_true',
90 Skip files that already exist. The default behavior is to refuse to
91 write *anything* if any files exist that would have to be
92 overwritten. This option causes even devices, sockets, and fifos to be
95 group.add_argument('--strip-manifest', action='store_true', default=False,
97 When getting a collection manifest, strip its access tokens before writing
101 parser.add_argument('--threads', type=int, metavar='N', default=4,
103 Set the number of download threads to be used. Take into account that
104 using lots of threads will increase the RAM requirements. Default is
106 On high latency installations, using a greater number will improve
110 def parse_arguments(arguments, stdout, stderr):
111 args = parser.parse_args(arguments)
113 if args.locator[-1] == os.sep:
117 not (args.destination and
118 os.path.isdir(args.destination))):
119 parser.error('Destination is not a directory.')
120 if not args.r and (os.path.isdir(args.destination) or
121 args.destination[-1] == os.path.sep):
122 args.destination = os.path.join(args.destination,
123 os.path.basename(args.locator))
124 logger.debug("Appended source file name to destination directory: %s",
127 if args.destination == '/dev/stdout':
128 args.destination = "-"
130 if args.destination == '-':
131 # Normally you have to use -f to write to a file (or device) that
132 # already exists, but "-" and "/dev/stdout" are common enough to
133 # merit a special exception.
136 args.destination = args.destination.rstrip(os.sep)
138 # Turn on --progress by default if stderr is a tty and output is
139 # either going to a named file, or going (via stdout) to something
141 if (not (args.batch_progress or args.no_progress)
143 and (args.destination != '-'
144 or not stdout.isatty())):
148 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
149 if stdout is sys.stdout and hasattr(stdout, 'buffer'):
150 # in Python 3, write to stdout as binary
151 stdout = stdout.buffer
153 args = parse_arguments(arguments, stdout, stderr)
154 logger.setLevel(logging.WARNING - 10 * args.v)
156 request_id = arvados.util.new_request_id()
157 logger.info('X-Request-Id: '+request_id)
159 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
161 r = re.search(r'^(.*?)(/.*)?$', args.locator)
163 get_prefix = r.group(2)
164 if args.r and not get_prefix:
167 # User asked to download the collection's manifest
170 open_flags = os.O_CREAT | os.O_WRONLY
172 open_flags |= os.O_EXCL
174 if args.destination == "-":
175 write_block_or_manifest(
176 dest=stdout, src=col_loc,
177 api_client=api_client, args=args)
179 out_fd = os.open(args.destination, open_flags)
180 with os.fdopen(out_fd, 'wb') as out_file:
181 write_block_or_manifest(
182 dest=out_file, src=col_loc,
183 api_client=api_client, args=args)
184 except (IOError, OSError) as error:
185 logger.error("can't write to '{}': {}".format(args.destination, error))
187 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
188 logger.error("failed to download '{}': {}".format(col_loc, error))
190 except arvados.errors.ArgumentError as error:
191 if 'Argument to CollectionReader' in str(error):
192 logger.error("error reading collection: {}".format(error))
199 reader = arvados.CollectionReader(
200 col_loc, api_client=api_client, num_retries=args.retries,
201 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
202 except Exception as error:
203 logger.error("failed to read collection: {}".format(error))
206 # Scan the collection. Make an array of (stream, file, local
207 # destination filename) tuples, and add up total size to extract.
211 if get_prefix == os.sep:
214 item = reader.find('.' + get_prefix)
216 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
217 # If the user asked for a file and we got a subcollection, error out.
218 if get_prefix[-1] != os.sep:
219 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
221 # If the user asked stdout as a destination, error out.
222 elif args.destination == '-':
223 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
225 # User asked for a subcollection, and that's what was found. Add up total size
227 for s, f in files_in_collection(item):
228 dest_path = os.path.join(
230 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
231 if (not (args.n or args.f or args.skip_existing) and
232 os.path.exists(dest_path)):
233 logger.error('Local file %s already exists.' % (dest_path,))
235 todo += [(s, f, dest_path)]
236 todo_bytes += f.size()
237 elif isinstance(item, arvados.arvfile.ArvadosFile):
238 todo += [(item.parent, item, args.destination)]
239 todo_bytes += item.size()
241 logger.error("'{}' not found.".format('.' + get_prefix))
243 except (IOError, arvados.errors.NotFoundError) as e:
248 for s, f, outfilename in todo:
252 if outfilename == "-":
255 if args.skip_existing and os.path.exists(outfilename):
256 logger.debug('Local file %s exists. Skipping.', outfilename)
258 elif not args.f and (os.path.isfile(outfilename) or
259 os.path.isdir(outfilename)):
260 # Good thing we looked again: apparently this file wasn't
261 # here yet when we checked earlier.
262 logger.error('Local file %s already exists.' % (outfilename,))
265 pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True)
267 outfile = open(outfilename, 'wb')
268 except Exception as error:
269 logger.error('Open(%s) failed: %s' % (outfilename, error))
272 digestor = hashlib.new(args.hash)
274 with s.open(f.name, 'rb') as file_reader:
275 for data in file_reader.readall():
279 digestor.update(data)
280 out_bytes += len(data)
282 stderr.write('\r%d MiB / %d MiB %.1f%%' %
287 else 100.0*out_bytes/todo_bytes)))
288 elif args.batch_progress:
289 stderr.write('%s %d read %d total %d\n' %
290 (sys.argv[0], os.getpid(),
291 out_bytes, todo_bytes))
293 stderr.write("%s %s/%s\n"
294 % (digestor.hexdigest(), s.stream_name(), f.name))
295 except KeyboardInterrupt:
296 if outfile and (outfile.fileno() > 2) and not outfile.closed:
297 os.unlink(outfile.name)
300 if outfile != None and outfile != stdout:
307 def files_in_collection(c):
308 # Sort first by file type, then alphabetically by file path.
309 for i in sorted(list(c.keys()),
311 isinstance(c[k], arvados.collection.Subcollection),
313 if isinstance(c[i], arvados.arvfile.ArvadosFile):
315 elif isinstance(c[i], arvados.collection.Subcollection):
316 for s, f in files_in_collection(c[i]):
319 def write_block_or_manifest(dest, src, api_client, args):
322 kc = arvados.keep.KeepClient(api_client=api_client)
323 dest.write(kc.get(src, num_retries=args.retries))
325 # collection UUID or portable data hash
326 reader = arvados.CollectionReader(
327 src, api_client=api_client, num_retries=args.retries)
328 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())