Merge branch '3212-pipeline-no-reuse' closes #3212
[arvados.git] / sdk / python / bin / arv-get
1 #!/usr/bin/env python
2
3 import argparse
4 import hashlib
5 import os
6 import re
7 import string
8 import sys
9 import logging
10
11 logger = logging.getLogger(os.path.basename(sys.argv[0]))
12
13 parser = argparse.ArgumentParser(
14     description='Copy data from Keep to a local file or pipe.')
15 parser.add_argument('locator', type=str,
16                     help="""
17 Collection locator, optionally with a file path or prefix.
18 """)
19 parser.add_argument('destination', type=str, nargs='?', default='/dev/stdout',
20                     help="""
21 Local file or directory where the data is to be written. Default:
22 /dev/stdout.
23 """)
24 group = parser.add_mutually_exclusive_group()
25 group.add_argument('--progress', action='store_true',
26                    help="""
27 Display human-readable progress on stderr (bytes and, if possible,
28 percentage of total data size). This is the default behavior when
29 stderr is a tty and stdout is not a tty.
30 """)
31 group.add_argument('--no-progress', action='store_true',
32                    help="""
33 Do not display human-readable progress on stderr.
34 """)
35 group.add_argument('--batch-progress', action='store_true',
36                    help="""
37 Display machine-readable progress on stderr (bytes and, if known,
38 total data size).
39 """)
40 group = parser.add_mutually_exclusive_group()
41 group.add_argument('--hash',
42                     help="""
43 Display the hash of each file as it is read from Keep, using the given
44 hash algorithm. Supported algorithms include md5, sha1, sha224,
45 sha256, sha384, and sha512.
46 """)
47 group.add_argument('--md5sum', action='store_const',
48                     dest='hash', const='md5',
49                     help="""
50 Display the MD5 hash of each file as it is read from Keep.
51 """)
52 parser.add_argument('-n', action='store_true',
53                     help="""
54 Do not write any data -- just read from Keep, and report md5sums if
55 requested.
56 """)
57 parser.add_argument('-r', action='store_true',
58                     help="""
59 Retrieve all files in the specified collection/prefix. This is the
60 default behavior if the "locator" argument ends with a forward slash.
61 """)
62 group = parser.add_mutually_exclusive_group()
63 group.add_argument('-f', action='store_true',
64                    help="""
65 Overwrite existing files while writing. The default behavior is to
66 refuse to write *anything* if any of the output files already
67 exist. As a special case, -f is not needed to write to /dev/stdout.
68 """)
69 group.add_argument('--skip-existing', action='store_true',
70                    help="""
71 Skip files that already exist. The default behavior is to refuse to
72 write *anything* if any files exist that would have to be
73 overwritten. This option causes even devices, sockets, and fifos to be
74 skipped.
75 """)
76
77 args = parser.parse_args()
78
79 if args.locator[-1] == os.sep:
80     args.r = True
81 if (args.r and
82     not args.n and
83     not (args.destination and
84          os.path.isdir(args.destination))):
85     parser.error('Destination is not a directory.')
86 if not args.r and (os.path.isdir(args.destination) or
87                    args.destination[-1] == os.path.sep):
88     args.destination = os.path.join(args.destination,
89                                     os.path.basename(args.locator))
90     logger.debug("Appended source file name to destination directory: %s" %
91                  args.destination)
92
93 # Turn on --progress by default if stderr is a tty and stdout isn't.
94 if (not (args.batch_progress or args.no_progress)
95     and os.isatty(sys.stderr.fileno())
96     and not os.isatty(sys.stdout.fileno())):
97     args.progress = True
98
99 if args.destination == '-':
100     args.destination = '/dev/stdout'
101 if args.destination == '/dev/stdout':
102     # Normally you have to use -f to write to a file (or device) that
103     # already exists, but "-" and "/dev/stdout" are common enough to
104     # merit a special exception.
105     args.f = True
106 else:
107     args.destination = args.destination.rstrip(os.sep)
108
109
110 import arvados
111
112 r = re.search(r'^(.*?)(/.*)?$', args.locator)
113 collection = r.group(1)
114 get_prefix = r.group(2)
115 if args.r and not get_prefix:
116     get_prefix = os.sep
117
118 todo = []
119 todo_bytes = 0
120 if not get_prefix:
121     try:
122         if not args.n:
123             if not args.f and os.path.exists(args.destination):
124                 logger.error('Local file %s already exists' % args.destination)
125                 sys.exit(1)
126             with open(args.destination, 'wb') as f:
127                 try:
128                     c = arvados.api('v1').collections().get(
129                         uuid=collection).execute()
130                     manifest = c['manifest_text']
131                 except Exception as e:
132                     logging.warning(
133                         "API lookup failed for collection %s (%s: %s)" %
134                         (collection, type(e), str(e)))
135                     manifest = arvados.Keep.get(collection)
136                 f.write(manifest)
137         sys.exit(0)
138     except arvados.errors.NotFoundError as e:
139         logger.error(e)
140         sys.exit(1)
141
142 reader = arvados.CollectionReader(collection)
143
144 # Scan the collection. Make an array of (stream, file, local
145 # destination filename) tuples, and add up total size to extract.
146
147 try:
148     for s in reader.all_streams():
149         for f in s.all_files():
150             if get_prefix and get_prefix[-1] == os.sep:
151                 if 0 != string.find(os.path.join(s.name(), f.name()),
152                                     '.' + get_prefix):
153                     continue
154                 dest_path = os.path.join(
155                     args.destination,
156                     os.path.join(s.name(), f.name())[len(get_prefix)+1:])
157                 if (not (args.n or args.f or args.skip_existing) and
158                     os.path.exists(dest_path)):
159                     logger.error('Local file %s already exists' % dest_path)
160                     sys.exit(1)
161             else:
162                 if os.path.join(s.name(), f.name()) != '.' + get_prefix:
163                     continue
164                 dest_path = args.destination
165             todo += [(s, f, dest_path)]
166             todo_bytes += f.size()
167 except arvados.errors.NotFoundError as e:
168     logger.error(e)
169     sys.exit(1)
170
171 # Read data, and (if not -n) write to local file(s) or pipe.
172
173 out_bytes = 0
174 for s,f,outfilename in todo:
175     outfile = None
176     digestor = None
177     if not args.n:
178         if args.skip_existing and os.path.exists(outfilename):
179             logger.debug('Local file %s exists. Skipping.' % outfilename)
180             continue
181         elif not args.f and (os.path.isfile(outfilename) or
182                            os.path.isdir(outfilename)):
183             # Good thing we looked again: apparently this file wasn't
184             # here yet when we checked earlier.
185             logger.error('Local file %s already exists' % outfilename)
186             sys.exit(1)
187         if args.r:
188             arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
189         try:
190             outfile = open(outfilename, 'wb')
191         except Exception as e:
192             logger.error('Open(%s) failed: %s' % (outfilename, e))
193             sys.exit(1)
194     if args.hash:
195         digestor = hashlib.new(args.hash)
196     try:
197         for data in f.readall():
198             if outfile:
199                 outfile.write(data)
200             if digestor:
201                 digestor.update(data)
202             out_bytes += len(data)
203             if args.progress:
204                 sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
205                                  (out_bytes >> 20,
206                                   todo_bytes >> 20,
207                                   (100
208                                    if todo_bytes==0
209                                    else 100.0*out_bytes/todo_bytes)))
210             elif args.batch_progress:
211                 sys.stderr.write('%s %d read %d total\n' %
212                                  (sys.argv[0], os.getpid(),
213                                   out_bytes, todo_bytes))
214         if digestor:
215             sys.stderr.write("%s  %s/%s\n"
216                              % (digestor.hexdigest(), s.name(), f.name()))
217     except KeyboardInterrupt:
218         if outfile and outfile != '/dev/stdout':
219             os.unlink(outfilename)
220         break
221
222 if args.progress:
223     sys.stderr.write('\n')