3550: Fix running local job repeatedly on subsequent loop iterations.
[arvados.git] / sdk / python / bin / arv-get
1 #!/usr/bin/env python
2
3 import argparse
4 import hashlib
5 import os
6 import re
7 import string
8 import sys
9 import logging
10
11 import arvados
12
13 logger = logging.getLogger('arvados.arv-get')
14
15 def abort(msg, code=1):
16     print >>sys.stderr, "arv-get:", msg
17     exit(code)
18
19 parser = argparse.ArgumentParser(
20     description='Copy data from Keep to a local file or pipe.')
21 parser.add_argument('locator', type=str,
22                     help="""
23 Collection locator, optionally with a file path or prefix.
24 """)
25 parser.add_argument('destination', type=str, nargs='?', default='/dev/stdout',
26                     help="""
27 Local file or directory where the data is to be written. Default:
28 /dev/stdout.
29 """)
30 group = parser.add_mutually_exclusive_group()
31 group.add_argument('--progress', action='store_true',
32                    help="""
33 Display human-readable progress on stderr (bytes and, if possible,
34 percentage of total data size). This is the default behavior when it
35 is not expected to interfere with the output: specifically, stderr is
36 a tty _and_ either stdout is not a tty, or output is being written to
37 named files rather than stdout.
38 """)
39 group.add_argument('--no-progress', action='store_true',
40                    help="""
41 Do not display human-readable progress on stderr.
42 """)
43 group.add_argument('--batch-progress', action='store_true',
44                    help="""
45 Display machine-readable progress on stderr (bytes and, if known,
46 total data size).
47 """)
48 group = parser.add_mutually_exclusive_group()
49 group.add_argument('--hash',
50                     help="""
51 Display the hash of each file as it is read from Keep, using the given
52 hash algorithm. Supported algorithms include md5, sha1, sha224,
53 sha256, sha384, and sha512.
54 """)
55 group.add_argument('--md5sum', action='store_const',
56                     dest='hash', const='md5',
57                     help="""
58 Display the MD5 hash of each file as it is read from Keep.
59 """)
60 parser.add_argument('-n', action='store_true',
61                     help="""
62 Do not write any data -- just read from Keep, and report md5sums if
63 requested.
64 """)
65 parser.add_argument('-r', action='store_true',
66                     help="""
67 Retrieve all files in the specified collection/prefix. This is the
68 default behavior if the "locator" argument ends with a forward slash.
69 """)
70 group = parser.add_mutually_exclusive_group()
71 group.add_argument('-f', action='store_true',
72                    help="""
73 Overwrite existing files while writing. The default behavior is to
74 refuse to write *anything* if any of the output files already
75 exist. As a special case, -f is not needed to write to /dev/stdout.
76 """)
77 group.add_argument('--skip-existing', action='store_true',
78                    help="""
79 Skip files that already exist. The default behavior is to refuse to
80 write *anything* if any files exist that would have to be
81 overwritten. This option causes even devices, sockets, and fifos to be
82 skipped.
83 """)
84
85 args = parser.parse_args()
86
87 if args.locator[-1] == os.sep:
88     args.r = True
89 if (args.r and
90     not args.n and
91     not (args.destination and
92          os.path.isdir(args.destination))):
93     parser.error('Destination is not a directory.')
94 if not args.r and (os.path.isdir(args.destination) or
95                    args.destination[-1] == os.path.sep):
96     args.destination = os.path.join(args.destination,
97                                     os.path.basename(args.locator))
98     logger.debug("Appended source file name to destination directory: %s",
99                  args.destination)
100
101 if args.destination == '-':
102     args.destination = '/dev/stdout'
103 if args.destination == '/dev/stdout':
104     # Normally you have to use -f to write to a file (or device) that
105     # already exists, but "-" and "/dev/stdout" are common enough to
106     # merit a special exception.
107     args.f = True
108 else:
109     args.destination = args.destination.rstrip(os.sep)
110
111 # Turn on --progress by default if stderr is a tty and output is
112 # either going to a named file, or going (via stdout) to something
113 # that isn't a tty.
114 if (not (args.batch_progress or args.no_progress)
115     and sys.stderr.isatty()
116     and (args.destination != '/dev/stdout'
117          or not sys.stdout.isatty())):
118     args.progress = True
119
120
121 r = re.search(r'^(.*?)(/.*)?$', args.locator)
122 collection = r.group(1)
123 get_prefix = r.group(2)
124 if args.r and not get_prefix:
125     get_prefix = os.sep
126
127 todo = []
128 todo_bytes = 0
129 api_client = arvados.api('v1')
130 if not get_prefix:
131     try:
132         if not args.n:
133             if not args.f and os.path.exists(args.destination):
134                 abort('Local file %s already exists.' % (args.destination,))
135             with open(args.destination, 'wb') as f:
136                 try:
137                     c = api_client.collections().get(uuid=collection).execute()
138                     manifest = c['manifest_text']
139                 except Exception as e:
140                     logger.warning(
141                         "Collection %s not found. " +
142                         "Trying to fetch directly from Keep (deprecated).",
143                         collection)
144                     manifest = arvados.KeepClient(
145                         api_client=api_client).get(collection)
146                 f.write(manifest)
147         sys.exit(0)
148     except arvados.errors.NotFoundError as e:
149         abort(e)
150
151 reader = arvados.CollectionReader(collection)
152
153 # Scan the collection. Make an array of (stream, file, local
154 # destination filename) tuples, and add up total size to extract.
155
156 try:
157     for s in reader.all_streams():
158         for f in s.all_files():
159             if get_prefix and get_prefix[-1] == os.sep:
160                 if 0 != string.find(os.path.join(s.name(), f.name()),
161                                     '.' + get_prefix):
162                     continue
163                 dest_path = os.path.join(
164                     args.destination,
165                     os.path.join(s.name(), f.name())[len(get_prefix)+1:])
166                 if (not (args.n or args.f or args.skip_existing) and
167                     os.path.exists(dest_path)):
168                     abort('Local file %s already exists.' % (dest_path,))
169             else:
170                 if os.path.join(s.name(), f.name()) != '.' + get_prefix:
171                     continue
172                 dest_path = args.destination
173             todo += [(s, f, dest_path)]
174             todo_bytes += f.size()
175 except arvados.errors.NotFoundError as e:
176     abort(e)
177
178 # Read data, and (if not -n) write to local file(s) or pipe.
179
180 out_bytes = 0
181 for s,f,outfilename in todo:
182     outfile = None
183     digestor = None
184     if not args.n:
185         if args.skip_existing and os.path.exists(outfilename):
186             logger.debug('Local file %s exists. Skipping.', outfilename)
187             continue
188         elif not args.f and (os.path.isfile(outfilename) or
189                            os.path.isdir(outfilename)):
190             # Good thing we looked again: apparently this file wasn't
191             # here yet when we checked earlier.
192             abort('Local file %s already exists.' % (outfilename,))
193         if args.r:
194             arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
195         try:
196             outfile = open(outfilename, 'wb')
197         except Exception as e:
198             abort('Open(%s) failed: %s' % (outfilename, e))
199     if args.hash:
200         digestor = hashlib.new(args.hash)
201     try:
202         for data in f.readall():
203             if outfile:
204                 outfile.write(data)
205             if digestor:
206                 digestor.update(data)
207             out_bytes += len(data)
208             if args.progress:
209                 sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
210                                  (out_bytes >> 20,
211                                   todo_bytes >> 20,
212                                   (100
213                                    if todo_bytes==0
214                                    else 100.0*out_bytes/todo_bytes)))
215             elif args.batch_progress:
216                 sys.stderr.write('%s %d read %d total\n' %
217                                  (sys.argv[0], os.getpid(),
218                                   out_bytes, todo_bytes))
219         if digestor:
220             sys.stderr.write("%s  %s/%s\n"
221                              % (digestor.hexdigest(), s.name(), f.name()))
222     except KeyboardInterrupt:
223         if outfile and outfile != '/dev/stdout':
224             os.unlink(outfilename)
225         break
226
227 if args.progress:
228     sys.stderr.write('\n')