10111: Merge branch 'master' into 10111-cr-provenance-graph
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python
2
3 import argparse
4 import hashlib
5 import os
6 import re
7 import string
8 import sys
9 import logging
10
11 import arvados
12 import arvados.commands._util as arv_cmd
13 import arvados.util as util
14
15 from arvados._version import __version__
16
17 api_client = None
18 logger = logging.getLogger('arvados.arv-get')
19
20 parser = argparse.ArgumentParser(
21     description='Copy data from Keep to a local file or pipe.',
22     parents=[arv_cmd.retry_opt])
23 parser.add_argument('--version', action='version',
24                     version="%s %s" % (sys.argv[0], __version__),
25                     help='Print version and exit.')
26 parser.add_argument('locator', type=str,
27                     help="""
28 Collection locator, optionally with a file path or prefix.
29 """)
30 parser.add_argument('destination', type=str, nargs='?', default='-',
31                     help="""
32 Local file or directory where the data is to be written. Default: stdout.
33 """)
34 group = parser.add_mutually_exclusive_group()
35 group.add_argument('--progress', action='store_true',
36                    help="""
37 Display human-readable progress on stderr (bytes and, if possible,
38 percentage of total data size). This is the default behavior when it
39 is not expected to interfere with the output: specifically, stderr is
40 a tty _and_ either stdout is not a tty, or output is being written to
41 named files rather than stdout.
42 """)
43 group.add_argument('--no-progress', action='store_true',
44                    help="""
45 Do not display human-readable progress on stderr.
46 """)
47 group.add_argument('--batch-progress', action='store_true',
48                    help="""
49 Display machine-readable progress on stderr (bytes and, if known,
50 total data size).
51 """)
52 group = parser.add_mutually_exclusive_group()
53 group.add_argument('--hash',
54                     help="""
55 Display the hash of each file as it is read from Keep, using the given
56 hash algorithm. Supported algorithms include md5, sha1, sha224,
57 sha256, sha384, and sha512.
58 """)
59 group.add_argument('--md5sum', action='store_const',
60                     dest='hash', const='md5',
61                     help="""
62 Display the MD5 hash of each file as it is read from Keep.
63 """)
64 parser.add_argument('-n', action='store_true',
65                     help="""
66 Do not write any data -- just read from Keep, and report md5sums if
67 requested.
68 """)
69 parser.add_argument('-r', action='store_true',
70                     help="""
71 Retrieve all files in the specified collection/prefix. This is the
72 default behavior if the "locator" argument ends with a forward slash.
73 """)
74 group = parser.add_mutually_exclusive_group()
75 group.add_argument('-f', action='store_true',
76                    help="""
77 Overwrite existing files while writing. The default behavior is to
78 refuse to write *anything* if any of the output files already
79 exist. As a special case, -f is not needed to write to stdout.
80 """)
81 group.add_argument('--skip-existing', action='store_true',
82                    help="""
83 Skip files that already exist. The default behavior is to refuse to
84 write *anything* if any files exist that would have to be
85 overwritten. This option causes even devices, sockets, and fifos to be
86 skipped.
87 """)
88 group.add_argument('--strip-manifest', action='store_true', default=False,
89                    help="""
90 When getting a collection manifest, strip its access tokens before writing
91 it.
92 """)
93
94 def parse_arguments(arguments, stdout, stderr):
95     args = parser.parse_args(arguments)
96
97     if args.locator[-1] == os.sep:
98         args.r = True
99     if (args.r and
100         not args.n and
101         not (args.destination and
102              os.path.isdir(args.destination))):
103         parser.error('Destination is not a directory.')
104     if not args.r and (os.path.isdir(args.destination) or
105                        args.destination[-1] == os.path.sep):
106         args.destination = os.path.join(args.destination,
107                                         os.path.basename(args.locator))
108         logger.debug("Appended source file name to destination directory: %s",
109                      args.destination)
110
111     if args.destination == '/dev/stdout':
112         args.destination = "-"
113
114     if args.destination == '-':
115         # Normally you have to use -f to write to a file (or device) that
116         # already exists, but "-" and "/dev/stdout" are common enough to
117         # merit a special exception.
118         args.f = True
119     else:
120         args.destination = args.destination.rstrip(os.sep)
121
122     # Turn on --progress by default if stderr is a tty and output is
123     # either going to a named file, or going (via stdout) to something
124     # that isn't a tty.
125     if (not (args.batch_progress or args.no_progress)
126         and stderr.isatty()
127         and (args.destination != '-'
128              or not stdout.isatty())):
129         args.progress = True
130     return args
131
132 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
133     global api_client
134     
135     args = parse_arguments(arguments, stdout, stderr)
136     if api_client is None:
137         api_client = arvados.api('v1')
138
139     r = re.search(r'^(.*?)(/.*)?$', args.locator)
140     col_loc = r.group(1)
141     get_prefix = r.group(2)
142     if args.r and not get_prefix:
143         get_prefix = os.sep
144     try:
145         reader = arvados.CollectionReader(col_loc, num_retries=args.retries)
146     except Exception as error:
147         logger.error("failed to read collection: {}".format(error))
148         return 1
149
150     # User asked to download the collection's manifest
151     if not get_prefix:
152         if not args.n:
153             open_flags = os.O_CREAT | os.O_WRONLY
154             if not args.f:
155                 open_flags |= os.O_EXCL
156             try:
157                 if args.destination == "-":
158                     stdout.write(reader.manifest_text(strip=args.strip_manifest))
159                 else:
160                     out_fd = os.open(args.destination, open_flags)
161                     with os.fdopen(out_fd, 'wb') as out_file:
162                         out_file.write(reader.manifest_text(strip=args.strip_manifest))
163             except (IOError, OSError) as error:
164                 logger.error("can't write to '{}': {}".format(args.destination, error))
165                 return 1
166             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
167                 logger.error("failed to download '{}': {}".format(col_loc, error))
168                 return 1
169         return 0
170
171     # Scan the collection. Make an array of (stream, file, local
172     # destination filename) tuples, and add up total size to extract.
173     todo = []
174     todo_bytes = 0
175     try:
176         if get_prefix == os.sep:
177             item = reader
178         else:
179             item = reader.find('.' + get_prefix)
180
181         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
182             # If the user asked for a file and we got a subcollection, error out.
183             if get_prefix[-1] != os.sep:
184                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
185                 return 1
186             # If the user asked stdout as a destination, error out.
187             elif args.destination == '-':
188                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
189                 return 1
190             # User asked for a subcollection, and that's what was found. Add up total size
191             # to download.
192             for s, f in files_in_collection(item):
193                 dest_path = os.path.join(
194                     args.destination,
195                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
196                 if (not (args.n or args.f or args.skip_existing) and
197                     os.path.exists(dest_path)):
198                     logger.error('Local file %s already exists.' % (dest_path,))
199                     return 1
200                 todo += [(s, f, dest_path)]
201                 todo_bytes += f.size()
202         elif isinstance(item, arvados.arvfile.ArvadosFile):
203             todo += [(item.parent, item, args.destination)]
204             todo_bytes += item.size()
205         else:
206             logger.error("'{}' not found.".format('.' + get_prefix))
207             return 1
208     except (IOError, arvados.errors.NotFoundError) as e:
209         logger.error(e)
210         return 1
211
212     out_bytes = 0
213     for s, f, outfilename in todo:
214         outfile = None
215         digestor = None
216         if not args.n:
217             if outfilename == "-":
218                 outfile = stdout
219             else:
220                 if args.skip_existing and os.path.exists(outfilename):
221                     logger.debug('Local file %s exists. Skipping.', outfilename)
222                     continue
223                 elif not args.f and (os.path.isfile(outfilename) or
224                                    os.path.isdir(outfilename)):
225                     # Good thing we looked again: apparently this file wasn't
226                     # here yet when we checked earlier.
227                     logger.error('Local file %s already exists.' % (outfilename,))
228                     return 1
229                 if args.r:
230                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
231                 try:
232                     outfile = open(outfilename, 'wb')
233                 except Exception as error:
234                     logger.error('Open(%s) failed: %s' % (outfilename, error))
235                     return 1
236         if args.hash:
237             digestor = hashlib.new(args.hash)
238         try:
239             with s.open(f.name, 'r') as file_reader:
240                 for data in file_reader.readall():
241                     if outfile:
242                         outfile.write(data)
243                     if digestor:
244                         digestor.update(data)
245                     out_bytes += len(data)
246                     if args.progress:
247                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
248                                      (out_bytes >> 20,
249                                       todo_bytes >> 20,
250                                       (100
251                                        if todo_bytes==0
252                                        else 100.0*out_bytes/todo_bytes)))
253                     elif args.batch_progress:
254                         stderr.write('%s %d read %d total\n' %
255                                      (sys.argv[0], os.getpid(),
256                                       out_bytes, todo_bytes))
257             if digestor:
258                 stderr.write("%s  %s/%s\n"
259                              % (digestor.hexdigest(), s.stream_name(), f.name))
260         except KeyboardInterrupt:
261             if outfile and (outfile.fileno() > 2) and not outfile.closed:
262                 os.unlink(outfile.name)
263             break
264         finally:
265             if outfile != None and outfile != stdout:
266                 outfile.close()
267
268     if args.progress:
269         stderr.write('\n')
270     return 0
271
272 def files_in_collection(c):
273     # Sort first by file type, then alphabetically by file path.
274     for i in sorted(c.keys(),
275                     key=lambda k: (
276                         isinstance(c[k], arvados.collection.Subcollection),
277                         k.upper())):
278         if isinstance(c[i], arvados.arvfile.ArvadosFile):
279             yield (c, c[i])
280         elif isinstance(c[i], arvados.collection.Subcollection):
281             for s, f in files_in_collection(c[i]):
282                 yield (s, f)