2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
18 from arvados._version import __version__
20 logger = logging.getLogger('arvados.arv-get')
22 parser = argparse.ArgumentParser(
23 description='Copy data from Keep to a local file or pipe.',
24 parents=[arv_cmd.retry_opt])
25 parser.add_argument('--version', action='version',
26 version="%s %s" % (sys.argv[0], __version__),
27 help='Print version and exit.')
28 parser.add_argument('locator', type=str,
30 Collection locator, optionally with a file path or prefix.
32 parser.add_argument('destination', type=str, nargs='?', default='-',
34 Local file or directory where the data is to be written. Default: stdout.
36 group = parser.add_mutually_exclusive_group()
37 group.add_argument('--progress', action='store_true',
39 Display human-readable progress on stderr (bytes and, if possible,
40 percentage of total data size). This is the default behavior when it
41 is not expected to interfere with the output: specifically, stderr is
42 a tty _and_ either stdout is not a tty, or output is being written to
43 named files rather than stdout.
45 group.add_argument('--no-progress', action='store_true',
47 Do not display human-readable progress on stderr.
49 group.add_argument('--batch-progress', action='store_true',
51 Display machine-readable progress on stderr (bytes and, if known,
54 group = parser.add_mutually_exclusive_group()
55 group.add_argument('--hash',
57 Display the hash of each file as it is read from Keep, using the given
58 hash algorithm. Supported algorithms include md5, sha1, sha224,
59 sha256, sha384, and sha512.
61 group.add_argument('--md5sum', action='store_const',
62 dest='hash', const='md5',
64 Display the MD5 hash of each file as it is read from Keep.
66 parser.add_argument('-n', action='store_true',
68 Do not write any data -- just read from Keep, and report md5sums if
71 parser.add_argument('-r', action='store_true',
73 Retrieve all files in the specified collection/prefix. This is the
74 default behavior if the "locator" argument ends with a forward slash.
76 group = parser.add_mutually_exclusive_group()
77 group.add_argument('-f', action='store_true',
79 Overwrite existing files while writing. The default behavior is to
80 refuse to write *anything* if any of the output files already
81 exist. As a special case, -f is not needed to write to stdout.
83 group.add_argument('-v', action='count', default=0,
85 Once for verbose mode, twice for debug mode.
87 group.add_argument('--skip-existing', action='store_true',
89 Skip files that already exist. The default behavior is to refuse to
90 write *anything* if any files exist that would have to be
91 overwritten. This option causes even devices, sockets, and fifos to be
94 group.add_argument('--strip-manifest', action='store_true', default=False,
96 When getting a collection manifest, strip its access tokens before writing
100 parser.add_argument('--threads', type=int, metavar='N', default=4,
102 Set the number of download threads to be used. Take into account that
103 using lots of threads will increase the RAM requirements. Default is
105 On high latency installations, using a greater number will improve
109 def parse_arguments(arguments, stdout, stderr):
110 args = parser.parse_args(arguments)
112 if args.locator[-1] == os.sep:
116 not (args.destination and
117 os.path.isdir(args.destination))):
118 parser.error('Destination is not a directory.')
119 if not args.r and (os.path.isdir(args.destination) or
120 args.destination[-1] == os.path.sep):
121 args.destination = os.path.join(args.destination,
122 os.path.basename(args.locator))
123 logger.debug("Appended source file name to destination directory: %s",
126 if args.destination == '/dev/stdout':
127 args.destination = "-"
129 if args.destination == '-':
130 # Normally you have to use -f to write to a file (or device) that
131 # already exists, but "-" and "/dev/stdout" are common enough to
132 # merit a special exception.
135 args.destination = args.destination.rstrip(os.sep)
137 # Turn on --progress by default if stderr is a tty and output is
138 # either going to a named file, or going (via stdout) to something
140 if (not (args.batch_progress or args.no_progress)
142 and (args.destination != '-'
143 or not stdout.isatty())):
147 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
148 if stdout is sys.stdout and hasattr(stdout, 'buffer'):
149 # in Python 3, write to stdout as binary
150 stdout = stdout.buffer
152 args = parse_arguments(arguments, stdout, stderr)
153 logger.setLevel(logging.WARNING - 10 * args.v)
155 request_id = arvados.util.new_request_id()
156 logger.info('X-Request-Id: '+request_id)
158 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
160 r = re.search(r'^(.*?)(/.*)?$', args.locator)
162 get_prefix = r.group(2)
163 if args.r and not get_prefix:
166 # User asked to download the collection's manifest
169 open_flags = os.O_CREAT | os.O_WRONLY
171 open_flags |= os.O_EXCL
173 if args.destination == "-":
174 write_block_or_manifest(
175 dest=stdout, src=col_loc,
176 api_client=api_client, args=args)
178 out_fd = os.open(args.destination, open_flags)
179 with os.fdopen(out_fd, 'wb') as out_file:
180 write_block_or_manifest(
181 dest=out_file, src=col_loc,
182 api_client=api_client, args=args)
183 except (IOError, OSError) as error:
184 logger.error("can't write to '{}': {}".format(args.destination, error))
186 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
187 logger.error("failed to download '{}': {}".format(col_loc, error))
189 except arvados.errors.ArgumentError as error:
190 if 'Argument to CollectionReader' in str(error):
191 logger.error("error reading collection: {}".format(error))
198 reader = arvados.CollectionReader(
199 col_loc, api_client=api_client, num_retries=args.retries,
200 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
201 except Exception as error:
202 logger.error("failed to read collection: {}".format(error))
205 # Scan the collection. Make an array of (stream, file, local
206 # destination filename) tuples, and add up total size to extract.
210 if get_prefix == os.sep:
213 item = reader.find('.' + get_prefix)
215 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
216 # If the user asked for a file and we got a subcollection, error out.
217 if get_prefix[-1] != os.sep:
218 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
220 # If the user asked stdout as a destination, error out.
221 elif args.destination == '-':
222 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
224 # User asked for a subcollection, and that's what was found. Add up total size
226 for s, f in files_in_collection(item):
227 dest_path = os.path.join(
229 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
230 if (not (args.n or args.f or args.skip_existing) and
231 os.path.exists(dest_path)):
232 logger.error('Local file %s already exists.' % (dest_path,))
234 todo += [(s, f, dest_path)]
235 todo_bytes += f.size()
236 elif isinstance(item, arvados.arvfile.ArvadosFile):
237 todo += [(item.parent, item, args.destination)]
238 todo_bytes += item.size()
240 logger.error("'{}' not found.".format('.' + get_prefix))
242 except (IOError, arvados.errors.NotFoundError) as e:
247 for s, f, outfilename in todo:
251 if outfilename == "-":
254 if args.skip_existing and os.path.exists(outfilename):
255 logger.debug('Local file %s exists. Skipping.', outfilename)
257 elif not args.f and (os.path.isfile(outfilename) or
258 os.path.isdir(outfilename)):
259 # Good thing we looked again: apparently this file wasn't
260 # here yet when we checked earlier.
261 logger.error('Local file %s already exists.' % (outfilename,))
264 arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
266 outfile = open(outfilename, 'wb')
267 except Exception as error:
268 logger.error('Open(%s) failed: %s' % (outfilename, error))
271 digestor = hashlib.new(args.hash)
273 with s.open(f.name, 'rb') as file_reader:
274 for data in file_reader.readall():
278 digestor.update(data)
279 out_bytes += len(data)
281 stderr.write('\r%d MiB / %d MiB %.1f%%' %
286 else 100.0*out_bytes/todo_bytes)))
287 elif args.batch_progress:
288 stderr.write('%s %d read %d total %d\n' %
289 (sys.argv[0], os.getpid(),
290 out_bytes, todo_bytes))
292 stderr.write("%s %s/%s\n"
293 % (digestor.hexdigest(), s.stream_name(), f.name))
294 except KeyboardInterrupt:
295 if outfile and (outfile.fileno() > 2) and not outfile.closed:
296 os.unlink(outfile.name)
299 if outfile != None and outfile != stdout:
306 def files_in_collection(c):
307 # Sort first by file type, then alphabetically by file path.
308 for i in sorted(list(c.keys()),
310 isinstance(c[k], arvados.collection.Subcollection),
312 if isinstance(c[i], arvados.arvfile.ArvadosFile):
314 elif isinstance(c[i], arvados.collection.Subcollection):
315 for s, f in files_in_collection(c[i]):
318 def write_block_or_manifest(dest, src, api_client, args):
321 kc = arvados.keep.KeepClient(api_client=api_client)
322 dest.write(kc.get(src, num_retries=args.retries))
324 # collection UUID or portable data hash
325 reader = arvados.CollectionReader(
326 src, api_client=api_client, num_retries=args.retries)
327 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())