Merge branch '11209-crunch-unmount-all'
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python
2
3 import argparse
4 import hashlib
5 import os
6 import re
7 import string
8 import sys
9 import logging
10
11 import arvados
12 import arvados.commands._util as arv_cmd
13 import arvados.util as util
14
15 from arvados._version import __version__
16
17 api_client = None
18 logger = logging.getLogger('arvados.arv-get')
19
20 parser = argparse.ArgumentParser(
21     description='Copy data from Keep to a local file or pipe.',
22     parents=[arv_cmd.retry_opt])
23 parser.add_argument('--version', action='version',
24                     version="%s %s" % (sys.argv[0], __version__),
25                     help='Print version and exit.')
26 parser.add_argument('locator', type=str,
27                     help="""
28 Collection locator, optionally with a file path or prefix.
29 """)
30 parser.add_argument('destination', type=str, nargs='?', default='-',
31                     help="""
32 Local file or directory where the data is to be written. Default: stdout.
33 """)
34 group = parser.add_mutually_exclusive_group()
35 group.add_argument('--progress', action='store_true',
36                    help="""
37 Display human-readable progress on stderr (bytes and, if possible,
38 percentage of total data size). This is the default behavior when it
39 is not expected to interfere with the output: specifically, stderr is
40 a tty _and_ either stdout is not a tty, or output is being written to
41 named files rather than stdout.
42 """)
43 group.add_argument('--no-progress', action='store_true',
44                    help="""
45 Do not display human-readable progress on stderr.
46 """)
47 group.add_argument('--batch-progress', action='store_true',
48                    help="""
49 Display machine-readable progress on stderr (bytes and, if known,
50 total data size).
51 """)
52 group = parser.add_mutually_exclusive_group()
53 group.add_argument('--hash',
54                     help="""
55 Display the hash of each file as it is read from Keep, using the given
56 hash algorithm. Supported algorithms include md5, sha1, sha224,
57 sha256, sha384, and sha512.
58 """)
59 group.add_argument('--md5sum', action='store_const',
60                     dest='hash', const='md5',
61                     help="""
62 Display the MD5 hash of each file as it is read from Keep.
63 """)
64 parser.add_argument('-n', action='store_true',
65                     help="""
66 Do not write any data -- just read from Keep, and report md5sums if
67 requested.
68 """)
69 parser.add_argument('-r', action='store_true',
70                     help="""
71 Retrieve all files in the specified collection/prefix. This is the
72 default behavior if the "locator" argument ends with a forward slash.
73 """)
74 group = parser.add_mutually_exclusive_group()
75 group.add_argument('-f', action='store_true',
76                    help="""
77 Overwrite existing files while writing. The default behavior is to
78 refuse to write *anything* if any of the output files already
79 exist. As a special case, -f is not needed to write to stdout.
80 """)
81 group.add_argument('--skip-existing', action='store_true',
82                    help="""
83 Skip files that already exist. The default behavior is to refuse to
84 write *anything* if any files exist that would have to be
85 overwritten. This option causes even devices, sockets, and fifos to be
86 skipped.
87 """)
88
89 def parse_arguments(arguments, stdout, stderr):
90     args = parser.parse_args(arguments)
91
92     if args.locator[-1] == os.sep:
93         args.r = True
94     if (args.r and
95         not args.n and
96         not (args.destination and
97              os.path.isdir(args.destination))):
98         parser.error('Destination is not a directory.')
99     if not args.r and (os.path.isdir(args.destination) or
100                        args.destination[-1] == os.path.sep):
101         args.destination = os.path.join(args.destination,
102                                         os.path.basename(args.locator))
103         logger.debug("Appended source file name to destination directory: %s",
104                      args.destination)
105
106     if args.destination == '/dev/stdout':
107         args.destination = "-"
108
109     if args.destination == '-':
110         # Normally you have to use -f to write to a file (or device) that
111         # already exists, but "-" and "/dev/stdout" are common enough to
112         # merit a special exception.
113         args.f = True
114     else:
115         args.destination = args.destination.rstrip(os.sep)
116
117     # Turn on --progress by default if stderr is a tty and output is
118     # either going to a named file, or going (via stdout) to something
119     # that isn't a tty.
120     if (not (args.batch_progress or args.no_progress)
121         and stderr.isatty()
122         and (args.destination != '-'
123              or not stdout.isatty())):
124         args.progress = True
125     return args
126
127 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
128     global api_client
129     
130     args = parse_arguments(arguments, stdout, stderr)
131     if api_client is None:
132         api_client = arvados.api('v1')
133
134     r = re.search(r'^(.*?)(/.*)?$', args.locator)
135     col_loc = r.group(1)
136     get_prefix = r.group(2)
137     if args.r and not get_prefix:
138         get_prefix = os.sep
139     try:
140         reader = arvados.CollectionReader(col_loc, num_retries=args.retries)
141     except Exception as error:
142         logger.error("failed to read collection: {}".format(error))
143         return 1
144
145     # User asked to download the collection's manifest
146     should_strip_manifest = False
147     if re.match(util.keep_locator_pattern, col_loc):
148         should_strip_manifest = True
149     if not get_prefix:
150         if not args.n:
151             open_flags = os.O_CREAT | os.O_WRONLY
152             if not args.f:
153                 open_flags |= os.O_EXCL
154             try:
155                 if args.destination == "-":
156                     stdout.write(reader.manifest_text(strip=should_strip_manifest))
157                 else:
158                     out_fd = os.open(args.destination, open_flags)
159                     with os.fdopen(out_fd, 'wb') as out_file:
160                         out_file.write(reader.manifest_text(strip=should_strip_manifest))
161             except (IOError, OSError) as error:
162                 logger.error("can't write to '{}': {}".format(args.destination, error))
163                 return 1
164             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
165                 logger.error("failed to download '{}': {}".format(col_loc, error))
166                 return 1
167         return 0
168
169     # Scan the collection. Make an array of (stream, file, local
170     # destination filename) tuples, and add up total size to extract.
171     todo = []
172     todo_bytes = 0
173     try:
174         if get_prefix == os.sep:
175             item = reader
176         else:
177             item = reader.find('.' + get_prefix)
178
179         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
180             # If the user asked for a file and we got a subcollection, error out.
181             if get_prefix[-1] != os.sep:
182                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
183                 return 1
184             # If the user asked stdout as a destination, error out.
185             elif args.destination == '-':
186                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
187                 return 1
188             # User asked for a subcollection, and that's what was found. Add up total size
189             # to download.
190             for s, f in files_in_collection(item):
191                 dest_path = os.path.join(
192                     args.destination,
193                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
194                 if (not (args.n or args.f or args.skip_existing) and
195                     os.path.exists(dest_path)):
196                     logger.error('Local file %s already exists.' % (dest_path,))
197                     return 1
198                 todo += [(s, f, dest_path)]
199                 todo_bytes += f.size()
200         elif isinstance(item, arvados.arvfile.ArvadosFile):
201             todo += [(item.parent, item, args.destination)]
202             todo_bytes += item.size()
203         else:
204             logger.error("'{}' not found.".format('.' + get_prefix))
205             return 1
206     except (IOError, arvados.errors.NotFoundError) as e:
207         logger.error(e)
208         return 1
209
210     out_bytes = 0
211     for s, f, outfilename in todo:
212         outfile = None
213         digestor = None
214         if not args.n:
215             if outfilename == "-":
216                 outfile = stdout
217             else:
218                 if args.skip_existing and os.path.exists(outfilename):
219                     logger.debug('Local file %s exists. Skipping.', outfilename)
220                     continue
221                 elif not args.f and (os.path.isfile(outfilename) or
222                                    os.path.isdir(outfilename)):
223                     # Good thing we looked again: apparently this file wasn't
224                     # here yet when we checked earlier.
225                     logger.error('Local file %s already exists.' % (outfilename,))
226                     return 1
227                 if args.r:
228                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
229                 try:
230                     outfile = open(outfilename, 'wb')
231                 except Exception as error:
232                     logger.error('Open(%s) failed: %s' % (outfilename, error))
233                     return 1
234         if args.hash:
235             digestor = hashlib.new(args.hash)
236         try:
237             with s.open(f.name, 'r') as file_reader:
238                 for data in file_reader.readall():
239                     if outfile:
240                         outfile.write(data)
241                     if digestor:
242                         digestor.update(data)
243                     out_bytes += len(data)
244                     if args.progress:
245                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
246                                      (out_bytes >> 20,
247                                       todo_bytes >> 20,
248                                       (100
249                                        if todo_bytes==0
250                                        else 100.0*out_bytes/todo_bytes)))
251                     elif args.batch_progress:
252                         stderr.write('%s %d read %d total\n' %
253                                      (sys.argv[0], os.getpid(),
254                                       out_bytes, todo_bytes))
255             if digestor:
256                 stderr.write("%s  %s/%s\n"
257                              % (digestor.hexdigest(), s.stream_name(), f.name))
258         except KeyboardInterrupt:
259             if outfile and (outfile.fileno() > 2) and not outfile.closed:
260                 os.unlink(outfile.name)
261             break
262         finally:
263             if outfile != None and outfile != stdout:
264                 outfile.close()
265
266     if args.progress:
267         stderr.write('\n')
268     return 0
269
270 def files_in_collection(c):
271     # Sort first by file type, then alphabetically by file path.
272     for i in sorted(c.keys(),
273                     key=lambda k: (
274                         isinstance(c[k], arvados.collection.Subcollection),
275                         k.upper())):
276         if isinstance(c[i], arvados.arvfile.ArvadosFile):
277             yield (c, c[i])
278         elif isinstance(c[i], arvados.collection.Subcollection):
279             for s, f in files_in_collection(c[i]):
280                 yield (s, f)