2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
18 from arvados._version import __version__
21 logger = logging.getLogger('arvados.arv-get')
23 parser = argparse.ArgumentParser(
24 description='Copy data from Keep to a local file or pipe.',
25 parents=[arv_cmd.retry_opt])
26 parser.add_argument('--version', action='version',
27 version="%s %s" % (sys.argv[0], __version__),
28 help='Print version and exit.')
29 parser.add_argument('locator', type=str,
31 Collection locator, optionally with a file path or prefix.
33 parser.add_argument('destination', type=str, nargs='?', default='-',
35 Local file or directory where the data is to be written. Default: stdout.
37 group = parser.add_mutually_exclusive_group()
38 group.add_argument('--progress', action='store_true',
40 Display human-readable progress on stderr (bytes and, if possible,
41 percentage of total data size). This is the default behavior when it
42 is not expected to interfere with the output: specifically, stderr is
43 a tty _and_ either stdout is not a tty, or output is being written to
44 named files rather than stdout.
46 group.add_argument('--no-progress', action='store_true',
48 Do not display human-readable progress on stderr.
50 group.add_argument('--batch-progress', action='store_true',
52 Display machine-readable progress on stderr (bytes and, if known,
55 group = parser.add_mutually_exclusive_group()
56 group.add_argument('--hash',
58 Display the hash of each file as it is read from Keep, using the given
59 hash algorithm. Supported algorithms include md5, sha1, sha224,
60 sha256, sha384, and sha512.
62 group.add_argument('--md5sum', action='store_const',
63 dest='hash', const='md5',
65 Display the MD5 hash of each file as it is read from Keep.
67 parser.add_argument('-n', action='store_true',
69 Do not write any data -- just read from Keep, and report md5sums if
72 parser.add_argument('-r', action='store_true',
74 Retrieve all files in the specified collection/prefix. This is the
75 default behavior if the "locator" argument ends with a forward slash.
77 group = parser.add_mutually_exclusive_group()
78 group.add_argument('-f', action='store_true',
80 Overwrite existing files while writing. The default behavior is to
81 refuse to write *anything* if any of the output files already
82 exist. As a special case, -f is not needed to write to stdout.
84 group.add_argument('-v', action='count', default=0,
86 Once for verbose mode, twice for debug mode.
88 group.add_argument('--skip-existing', action='store_true',
90 Skip files that already exist. The default behavior is to refuse to
91 write *anything* if any files exist that would have to be
92 overwritten. This option causes even devices, sockets, and fifos to be
95 group.add_argument('--strip-manifest', action='store_true', default=False,
97 When getting a collection manifest, strip its access tokens before writing
101 parser.add_argument('--threads', type=int, metavar='N', default=4,
103 Set the number of download threads to be used. Take into account that
104 using lots of threads will increase the RAM requirements. Default is
106 On high latency installations, using a greater number will improve
110 def parse_arguments(arguments, stdout, stderr):
111 args = parser.parse_args(arguments)
113 if args.locator[-1] == os.sep:
117 not (args.destination and
118 os.path.isdir(args.destination))):
119 parser.error('Destination is not a directory.')
120 if not args.r and (os.path.isdir(args.destination) or
121 args.destination[-1] == os.path.sep):
122 args.destination = os.path.join(args.destination,
123 os.path.basename(args.locator))
124 logger.debug("Appended source file name to destination directory: %s",
127 if args.destination == '/dev/stdout':
128 args.destination = "-"
130 if args.destination == '-':
131 # Normally you have to use -f to write to a file (or device) that
132 # already exists, but "-" and "/dev/stdout" are common enough to
133 # merit a special exception.
136 args.destination = args.destination.rstrip(os.sep)
138 # Turn on --progress by default if stderr is a tty and output is
139 # either going to a named file, or going (via stdout) to something
141 if (not (args.batch_progress or args.no_progress)
143 and (args.destination != '-'
144 or not stdout.isatty())):
148 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
151 if stdout is sys.stdout and hasattr(stdout, 'buffer'):
152 # in Python 3, write to stdout as binary
153 stdout = stdout.buffer
155 args = parse_arguments(arguments, stdout, stderr)
156 logger.setLevel(logging.WARNING - 10 * args.v)
158 request_id = arvados.util.new_request_id()
159 logger.info('X-Request-Id: '+request_id)
161 if api_client is None:
162 api_client = arvados.api('v1', request_id=request_id)
164 r = re.search(r'^(.*?)(/.*)?$', args.locator)
166 get_prefix = r.group(2)
167 if args.r and not get_prefix:
170 # User asked to download the collection's manifest
173 open_flags = os.O_CREAT | os.O_WRONLY
175 open_flags |= os.O_EXCL
177 if args.destination == "-":
178 write_block_or_manifest(
179 dest=stdout, src=col_loc,
180 api_client=api_client, args=args)
182 out_fd = os.open(args.destination, open_flags)
183 with os.fdopen(out_fd, 'wb') as out_file:
184 write_block_or_manifest(
185 dest=out_file, src=col_loc,
186 api_client=api_client, args=args)
187 except (IOError, OSError) as error:
188 logger.error("can't write to '{}': {}".format(args.destination, error))
190 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
191 logger.error("failed to download '{}': {}".format(col_loc, error))
193 except arvados.errors.ArgumentError as error:
194 if 'Argument to CollectionReader' in str(error):
195 logger.error("error reading collection: {}".format(error))
202 reader = arvados.CollectionReader(
203 col_loc, api_client=api_client, num_retries=args.retries,
204 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
205 get_threads=args.threads)
206 except Exception as error:
207 logger.error("failed to read collection: {}".format(error))
210 # Scan the collection. Make an array of (stream, file, local
211 # destination filename) tuples, and add up total size to extract.
215 if get_prefix == os.sep:
218 item = reader.find('.' + get_prefix)
220 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
221 # If the user asked for a file and we got a subcollection, error out.
222 if get_prefix[-1] != os.sep:
223 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
225 # If the user asked stdout as a destination, error out.
226 elif args.destination == '-':
227 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
229 # User asked for a subcollection, and that's what was found. Add up total size
231 for s, f in files_in_collection(item):
232 dest_path = os.path.join(
234 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
235 if (not (args.n or args.f or args.skip_existing) and
236 os.path.exists(dest_path)):
237 logger.error('Local file %s already exists.' % (dest_path,))
239 todo += [(s, f, dest_path)]
240 todo_bytes += f.size()
241 elif isinstance(item, arvados.arvfile.ArvadosFile):
242 todo += [(item.parent, item, args.destination)]
243 todo_bytes += item.size()
245 logger.error("'{}' not found.".format('.' + get_prefix))
247 except (IOError, arvados.errors.NotFoundError) as e:
252 for s, f, outfilename in todo:
256 if outfilename == "-":
259 if args.skip_existing and os.path.exists(outfilename):
260 logger.debug('Local file %s exists. Skipping.', outfilename)
262 elif not args.f and (os.path.isfile(outfilename) or
263 os.path.isdir(outfilename)):
264 # Good thing we looked again: apparently this file wasn't
265 # here yet when we checked earlier.
266 logger.error('Local file %s already exists.' % (outfilename,))
269 arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
271 outfile = open(outfilename, 'wb')
272 except Exception as error:
273 logger.error('Open(%s) failed: %s' % (outfilename, error))
276 digestor = hashlib.new(args.hash)
278 with s.open(f.name, 'rb') as file_reader:
279 for data in file_reader.readall():
283 digestor.update(data)
284 out_bytes += len(data)
286 stderr.write('\r%d MiB / %d MiB %.1f%%' %
291 else 100.0*out_bytes/todo_bytes)))
292 elif args.batch_progress:
293 stderr.write('%s %d read %d total\n' %
294 (sys.argv[0], os.getpid(),
295 out_bytes, todo_bytes))
297 stderr.write("%s %s/%s\n"
298 % (digestor.hexdigest(), s.stream_name(), f.name))
299 except KeyboardInterrupt:
300 if outfile and (outfile.fileno() > 2) and not outfile.closed:
301 os.unlink(outfile.name)
304 if outfile != None and outfile != stdout:
311 def files_in_collection(c):
312 # Sort first by file type, then alphabetically by file path.
313 for i in sorted(list(c.keys()),
315 isinstance(c[k], arvados.collection.Subcollection),
317 if isinstance(c[i], arvados.arvfile.ArvadosFile):
319 elif isinstance(c[i], arvados.collection.Subcollection):
320 for s, f in files_in_collection(c[i]):
323 def write_block_or_manifest(dest, src, api_client, args):
326 kc = arvados.keep.KeepClient(api_client=api_client)
327 dest.write(kc.get(src, num_retries=args.retries))
329 # collection UUID or portable data hash
330 reader = arvados.CollectionReader(
331 src, api_client=api_client, num_retries=args.retries)
332 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())