Merge branch '8465-cwl-containers-stdin-stderr' closes #8465
[arvados.git] / sdk / python / arvados / commands / get.py
1 #!/usr/bin/env python
2
3 import argparse
4 import hashlib
5 import os
6 import re
7 import string
8 import sys
9 import logging
10
11 import arvados
12 import arvados.commands._util as arv_cmd
13
14 from arvados._version import __version__
15
16 api_client = None
17 logger = logging.getLogger('arvados.arv-get')
18
19 parser = argparse.ArgumentParser(
20     description='Copy data from Keep to a local file or pipe.',
21     parents=[arv_cmd.retry_opt])
22 parser.add_argument('--version', action='version',
23                     version="%s %s" % (sys.argv[0], __version__),
24                     help='Print version and exit.')
25 parser.add_argument('locator', type=str,
26                     help="""
27 Collection locator, optionally with a file path or prefix.
28 """)
29 parser.add_argument('destination', type=str, nargs='?', default='-',
30                     help="""
31 Local file or directory where the data is to be written. Default: stdout.
32 """)
33 group = parser.add_mutually_exclusive_group()
34 group.add_argument('--progress', action='store_true',
35                    help="""
36 Display human-readable progress on stderr (bytes and, if possible,
37 percentage of total data size). This is the default behavior when it
38 is not expected to interfere with the output: specifically, stderr is
39 a tty _and_ either stdout is not a tty, or output is being written to
40 named files rather than stdout.
41 """)
42 group.add_argument('--no-progress', action='store_true',
43                    help="""
44 Do not display human-readable progress on stderr.
45 """)
46 group.add_argument('--batch-progress', action='store_true',
47                    help="""
48 Display machine-readable progress on stderr (bytes and, if known,
49 total data size).
50 """)
51 group = parser.add_mutually_exclusive_group()
52 group.add_argument('--hash',
53                     help="""
54 Display the hash of each file as it is read from Keep, using the given
55 hash algorithm. Supported algorithms include md5, sha1, sha224,
56 sha256, sha384, and sha512.
57 """)
58 group.add_argument('--md5sum', action='store_const',
59                     dest='hash', const='md5',
60                     help="""
61 Display the MD5 hash of each file as it is read from Keep.
62 """)
63 parser.add_argument('-n', action='store_true',
64                     help="""
65 Do not write any data -- just read from Keep, and report md5sums if
66 requested.
67 """)
68 parser.add_argument('-r', action='store_true',
69                     help="""
70 Retrieve all files in the specified collection/prefix. This is the
71 default behavior if the "locator" argument ends with a forward slash.
72 """)
73 group = parser.add_mutually_exclusive_group()
74 group.add_argument('-f', action='store_true',
75                    help="""
76 Overwrite existing files while writing. The default behavior is to
77 refuse to write *anything* if any of the output files already
78 exist. As a special case, -f is not needed to write to stdout.
79 """)
80 group.add_argument('--skip-existing', action='store_true',
81                    help="""
82 Skip files that already exist. The default behavior is to refuse to
83 write *anything* if any files exist that would have to be
84 overwritten. This option causes even devices, sockets, and fifos to be
85 skipped.
86 """)
87
88 def parse_arguments(arguments, stdout, stderr):
89     args = parser.parse_args(arguments)
90
91     if args.locator[-1] == os.sep:
92         args.r = True
93     if (args.r and
94         not args.n and
95         not (args.destination and
96              os.path.isdir(args.destination))):
97         parser.error('Destination is not a directory.')
98     if not args.r and (os.path.isdir(args.destination) or
99                        args.destination[-1] == os.path.sep):
100         args.destination = os.path.join(args.destination,
101                                         os.path.basename(args.locator))
102         logger.debug("Appended source file name to destination directory: %s",
103                      args.destination)
104
105     if args.destination == '/dev/stdout':
106         args.destination = "-"
107
108     if args.destination == '-':
109         # Normally you have to use -f to write to a file (or device) that
110         # already exists, but "-" and "/dev/stdout" are common enough to
111         # merit a special exception.
112         args.f = True
113     else:
114         args.destination = args.destination.rstrip(os.sep)
115
116     # Turn on --progress by default if stderr is a tty and output is
117     # either going to a named file, or going (via stdout) to something
118     # that isn't a tty.
119     if (not (args.batch_progress or args.no_progress)
120         and stderr.isatty()
121         and (args.destination != '-'
122              or not stdout.isatty())):
123         args.progress = True
124     return args
125
126 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
127     global api_client
128     
129     args = parse_arguments(arguments, stdout, stderr)
130     if api_client is None:
131         api_client = arvados.api('v1')
132
133     r = re.search(r'^(.*?)(/.*)?$', args.locator)
134     collection = r.group(1)
135     get_prefix = r.group(2)
136     if args.r and not get_prefix:
137         get_prefix = os.sep
138     try:
139         reader = arvados.CollectionReader(collection, num_retries=args.retries)
140     except Exception as error:
141         logger.error("failed to read collection: {}".format(error))
142         return 1
143
144     # User asked to download the collection's manifest
145     if not get_prefix:
146         if not args.n:
147             open_flags = os.O_CREAT | os.O_WRONLY
148             if not args.f:
149                 open_flags |= os.O_EXCL
150             try:
151                 if args.destination == "-":
152                     stdout.write(reader.manifest_text())
153                 else:
154                     out_fd = os.open(args.destination, open_flags)
155                     with os.fdopen(out_fd, 'wb') as out_file:
156                         out_file.write(reader.manifest_text())
157             except (IOError, OSError) as error:
158                 logger.error("can't write to '{}': {}".format(args.destination, error))
159                 return 1
160             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
161                 logger.error("failed to download '{}': {}".format(collection, error))
162                 return 1
163         return 0
164
165     # Scan the collection. Make an array of (stream, file, local
166     # destination filename) tuples, and add up total size to extract.
167     todo = []
168     todo_bytes = 0
169     try:
170         if get_prefix == os.sep:
171             item = reader
172         else:
173             item = reader.find('.' + get_prefix)
174
175         if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
176             # If the user asked for a file and we got a subcollection, error out.
177             if get_prefix[-1] != os.sep:
178                 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
179                 return 1
180             # If the user asked stdout as a destination, error out.
181             elif args.destination == '-':
182                 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
183                 return 1
184             # User asked for a subcollection, and that's what was found. Add up total size
185             # to download.
186             for s, f in files_in_collection(item):
187                 dest_path = os.path.join(
188                     args.destination,
189                     os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
190                 if (not (args.n or args.f or args.skip_existing) and
191                     os.path.exists(dest_path)):
192                     logger.error('Local file %s already exists.' % (dest_path,))
193                     return 1
194                 todo += [(s, f, dest_path)]
195                 todo_bytes += f.size()
196         elif isinstance(item, arvados.arvfile.ArvadosFile):
197             todo += [(item.parent, item, args.destination)]
198             todo_bytes += item.size()
199         else:
200             logger.error("'{}' not found.".format('.' + get_prefix))
201             return 1
202     except (IOError, arvados.errors.NotFoundError) as e:
203         logger.error(e)
204         return 1
205
206     out_bytes = 0
207     for s, f, outfilename in todo:
208         outfile = None
209         digestor = None
210         if not args.n:
211             if outfilename == "-":
212                 outfile = stdout
213             else:
214                 if args.skip_existing and os.path.exists(outfilename):
215                     logger.debug('Local file %s exists. Skipping.', outfilename)
216                     continue
217                 elif not args.f and (os.path.isfile(outfilename) or
218                                    os.path.isdir(outfilename)):
219                     # Good thing we looked again: apparently this file wasn't
220                     # here yet when we checked earlier.
221                     logger.error('Local file %s already exists.' % (outfilename,))
222                     return 1
223                 if args.r:
224                     arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
225                 try:
226                     outfile = open(outfilename, 'wb')
227                 except Exception as error:
228                     logger.error('Open(%s) failed: %s' % (outfilename, error))
229                     return 1
230         if args.hash:
231             digestor = hashlib.new(args.hash)
232         try:
233             with s.open(f.name, 'r') as file_reader:
234                 for data in file_reader.readall():
235                     if outfile:
236                         outfile.write(data)
237                     if digestor:
238                         digestor.update(data)
239                     out_bytes += len(data)
240                     if args.progress:
241                         stderr.write('\r%d MiB / %d MiB %.1f%%' %
242                                      (out_bytes >> 20,
243                                       todo_bytes >> 20,
244                                       (100
245                                        if todo_bytes==0
246                                        else 100.0*out_bytes/todo_bytes)))
247                     elif args.batch_progress:
248                         stderr.write('%s %d read %d total\n' %
249                                      (sys.argv[0], os.getpid(),
250                                       out_bytes, todo_bytes))
251             if digestor:
252                 stderr.write("%s  %s/%s\n"
253                              % (digestor.hexdigest(), s.stream_name(), f.name))
254         except KeyboardInterrupt:
255             if outfile and (outfile.fileno() > 2) and not outfile.closed:
256                 os.unlink(outfile.name)
257             break
258         finally:
259             if outfile != None and outfile != stdout:
260                 outfile.close()
261
262     if args.progress:
263         stderr.write('\n')
264     return 0
265
266 def files_in_collection(c):
267     # Sort first by file type, then alphabetically by file path.
268     for i in sorted(c.keys(),
269                     key=lambda k: (
270                         isinstance(c[k], arvados.collection.Subcollection),
271                         k.upper())):
272         if isinstance(c[i], arvados.arvfile.ArvadosFile):
273             yield (c, c[i])
274         elif isinstance(c[i], arvados.collection.Subcollection):
275             for s, f in files_in_collection(c[i]):
276                 yield (s, f)