3021: Propagate unhandled exceptions back to the caller instead of returning None...
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
14 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
15 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
16 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
17 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
18 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
19 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
20 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
21
22 def clear_tmpdir(path=None):
23     """
24     Ensure the given directory (or TASK_TMPDIR if none given)
25     exists and is empty.
26     """
27     if path is None:
28         path = arvados.current_task().tmpdir
29     if os.path.exists(path):
30         p = subprocess.Popen(['rm', '-rf', path])
31         stdout, stderr = p.communicate(None)
32         if p.returncode != 0:
33             raise Exception('rm -rf %s: %s' % (path, stderr))
34     os.mkdir(path)
35
36 def run_command(execargs, **kwargs):
37     kwargs.setdefault('stdin', subprocess.PIPE)
38     kwargs.setdefault('stdout', subprocess.PIPE)
39     kwargs.setdefault('stderr', sys.stderr)
40     kwargs.setdefault('close_fds', True)
41     kwargs.setdefault('shell', False)
42     p = subprocess.Popen(execargs, **kwargs)
43     stdoutdata, stderrdata = p.communicate(None)
44     if p.returncode != 0:
45         raise errors.CommandFailedError(
46             "run_command %s exit %d:\n%s" %
47             (execargs, p.returncode, stderrdata))
48     return stdoutdata, stderrdata
49
50 def git_checkout(url, version, path):
51     if not re.search('^/', path):
52         path = os.path.join(arvados.current_job().tmpdir, path)
53     if not os.path.exists(path):
54         run_command(["git", "clone", url, path],
55                     cwd=os.path.dirname(path))
56     run_command(["git", "checkout", version],
57                 cwd=path)
58     return path
59
60 def tar_extractor(path, decompress_flag):
61     return subprocess.Popen(["tar",
62                              "-C", path,
63                              ("-x%sf" % decompress_flag),
64                              "-"],
65                             stdout=None,
66                             stdin=subprocess.PIPE, stderr=sys.stderr,
67                             shell=False, close_fds=True)
68
69 def tarball_extract(tarball, path):
70     """Retrieve a tarball from Keep and extract it to a local
71     directory.  Return the absolute path where the tarball was
72     extracted. If the top level of the tarball contained just one
73     file or directory, return the absolute path of that single
74     item.
75
76     tarball -- collection locator
77     path -- where to extract the tarball: absolute, or relative to job tmp
78     """
79     if not re.search('^/', path):
80         path = os.path.join(arvados.current_job().tmpdir, path)
81     lockfile = open(path + '.lock', 'w')
82     fcntl.flock(lockfile, fcntl.LOCK_EX)
83     try:
84         os.stat(path)
85     except OSError:
86         os.mkdir(path)
87     already_have_it = False
88     try:
89         if os.readlink(os.path.join(path, '.locator')) == tarball:
90             already_have_it = True
91     except OSError:
92         pass
93     if not already_have_it:
94
95         # emulate "rm -f" (i.e., if the file does not exist, we win)
96         try:
97             os.unlink(os.path.join(path, '.locator'))
98         except OSError:
99             if os.path.exists(os.path.join(path, '.locator')):
100                 os.unlink(os.path.join(path, '.locator'))
101
102         for f in CollectionReader(tarball).all_files():
103             if re.search('\.(tbz|tar.bz2)$', f.name()):
104                 p = tar_extractor(path, 'j')
105             elif re.search('\.(tgz|tar.gz)$', f.name()):
106                 p = tar_extractor(path, 'z')
107             elif re.search('\.tar$', f.name()):
108                 p = tar_extractor(path, '')
109             else:
110                 raise errors.AssertionError(
111                     "tarball_extract cannot handle filename %s" % f.name())
112             while True:
113                 buf = f.read(2**20)
114                 if len(buf) == 0:
115                     break
116                 p.stdin.write(buf)
117             p.stdin.close()
118             p.wait()
119             if p.returncode != 0:
120                 lockfile.close()
121                 raise errors.CommandFailedError(
122                     "tar exited %d" % p.returncode)
123         os.symlink(tarball, os.path.join(path, '.locator'))
124     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
125     lockfile.close()
126     if len(tld_extracts) == 1:
127         return os.path.join(path, tld_extracts[0])
128     return path
129
130 def zipball_extract(zipball, path):
131     """Retrieve a zip archive from Keep and extract it to a local
132     directory.  Return the absolute path where the archive was
133     extracted. If the top level of the archive contained just one
134     file or directory, return the absolute path of that single
135     item.
136
137     zipball -- collection locator
138     path -- where to extract the archive: absolute, or relative to job tmp
139     """
140     if not re.search('^/', path):
141         path = os.path.join(arvados.current_job().tmpdir, path)
142     lockfile = open(path + '.lock', 'w')
143     fcntl.flock(lockfile, fcntl.LOCK_EX)
144     try:
145         os.stat(path)
146     except OSError:
147         os.mkdir(path)
148     already_have_it = False
149     try:
150         if os.readlink(os.path.join(path, '.locator')) == zipball:
151             already_have_it = True
152     except OSError:
153         pass
154     if not already_have_it:
155
156         # emulate "rm -f" (i.e., if the file does not exist, we win)
157         try:
158             os.unlink(os.path.join(path, '.locator'))
159         except OSError:
160             if os.path.exists(os.path.join(path, '.locator')):
161                 os.unlink(os.path.join(path, '.locator'))
162
163         for f in CollectionReader(zipball).all_files():
164             if not re.search('\.zip$', f.name()):
165                 raise errors.NotImplementedError(
166                     "zipball_extract cannot handle filename %s" % f.name())
167             zip_filename = os.path.join(path, os.path.basename(f.name()))
168             zip_file = open(zip_filename, 'wb')
169             while True:
170                 buf = f.read(2**20)
171                 if len(buf) == 0:
172                     break
173                 zip_file.write(buf)
174             zip_file.close()
175
176             p = subprocess.Popen(["unzip",
177                                   "-q", "-o",
178                                   "-d", path,
179                                   zip_filename],
180                                  stdout=None,
181                                  stdin=None, stderr=sys.stderr,
182                                  shell=False, close_fds=True)
183             p.wait()
184             if p.returncode != 0:
185                 lockfile.close()
186                 raise errors.CommandFailedError(
187                     "unzip exited %d" % p.returncode)
188             os.unlink(zip_filename)
189         os.symlink(zipball, os.path.join(path, '.locator'))
190     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
191     lockfile.close()
192     if len(tld_extracts) == 1:
193         return os.path.join(path, tld_extracts[0])
194     return path
195
196 def collection_extract(collection, path, files=[], decompress=True):
197     """Retrieve a collection from Keep and extract it to a local
198     directory.  Return the absolute path where the collection was
199     extracted.
200
201     collection -- collection locator
202     path -- where to extract: absolute, or relative to job tmp
203     """
204     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
205     if matches:
206         collection_hash = matches.group(1)
207     else:
208         collection_hash = hashlib.md5(collection).hexdigest()
209     if not re.search('^/', path):
210         path = os.path.join(arvados.current_job().tmpdir, path)
211     lockfile = open(path + '.lock', 'w')
212     fcntl.flock(lockfile, fcntl.LOCK_EX)
213     try:
214         os.stat(path)
215     except OSError:
216         os.mkdir(path)
217     already_have_it = False
218     try:
219         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
220             already_have_it = True
221     except OSError:
222         pass
223
224     # emulate "rm -f" (i.e., if the file does not exist, we win)
225     try:
226         os.unlink(os.path.join(path, '.locator'))
227     except OSError:
228         if os.path.exists(os.path.join(path, '.locator')):
229             os.unlink(os.path.join(path, '.locator'))
230
231     files_got = []
232     for s in CollectionReader(collection).all_streams():
233         stream_name = s.name()
234         for f in s.all_files():
235             if (files == [] or
236                 ((f.name() not in files_got) and
237                  (f.name() in files or
238                   (decompress and f.decompressed_name() in files)))):
239                 outname = f.decompressed_name() if decompress else f.name()
240                 files_got += [outname]
241                 if os.path.exists(os.path.join(path, stream_name, outname)):
242                     continue
243                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
244                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
245                 for buf in (f.readall_decompressed() if decompress
246                             else f.readall()):
247                     outfile.write(buf)
248                 outfile.close()
249     if len(files_got) < len(files):
250         raise errors.AssertionError(
251             "Wanted files %s but only got %s from %s" %
252             (files, files_got,
253              [z.name() for z in CollectionReader(collection).all_files()]))
254     os.symlink(collection_hash, os.path.join(path, '.locator'))
255
256     lockfile.close()
257     return path
258
259 def mkdir_dash_p(path):
260     if not os.path.isdir(path):
261         try:
262             os.makedirs(path)
263         except OSError as e:
264             if e.errno == errno.EEXIST and os.path.isdir(path):
265                 # It is not an error if someone else creates the
266                 # directory between our exists() and makedirs() calls.
267                 pass
268             else:
269                 raise
270
271 def stream_extract(stream, path, files=[], decompress=True):
272     """Retrieve a stream from Keep and extract it to a local
273     directory.  Return the absolute path where the stream was
274     extracted.
275
276     stream -- StreamReader object
277     path -- where to extract: absolute, or relative to job tmp
278     """
279     if not re.search('^/', path):
280         path = os.path.join(arvados.current_job().tmpdir, path)
281     lockfile = open(path + '.lock', 'w')
282     fcntl.flock(lockfile, fcntl.LOCK_EX)
283     try:
284         os.stat(path)
285     except OSError:
286         os.mkdir(path)
287
288     files_got = []
289     for f in stream.all_files():
290         if (files == [] or
291             ((f.name() not in files_got) and
292              (f.name() in files or
293               (decompress and f.decompressed_name() in files)))):
294             outname = f.decompressed_name() if decompress else f.name()
295             files_got += [outname]
296             if os.path.exists(os.path.join(path, outname)):
297                 os.unlink(os.path.join(path, outname))
298             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
299             outfile = open(os.path.join(path, outname), 'wb')
300             for buf in (f.readall_decompressed() if decompress
301                         else f.readall()):
302                 outfile.write(buf)
303             outfile.close()
304     if len(files_got) < len(files):
305         raise errors.AssertionError(
306             "Wanted files %s but only got %s from %s" %
307             (files, files_got, [z.name() for z in stream.all_files()]))
308     lockfile.close()
309     return path
310
311 def listdir_recursive(dirname, base=None, max_depth=None):
312     """listdir_recursive(dirname, base, max_depth)
313
314     Return a list of file and directory names found under dirname.
315
316     If base is not None, prepend "{base}/" to each returned name.
317
318     If max_depth is None, descend into directories and return only the
319     names of files found in the directory tree.
320
321     If max_depth is a non-negative integer, stop descending into
322     directories at the given depth, and at that point return directory
323     names instead.
324
325     If max_depth==0 (and base is None) this is equivalent to
326     sorted(os.listdir(dirname)).
327     """
328     allfiles = []
329     for ent in sorted(os.listdir(dirname)):
330         ent_path = os.path.join(dirname, ent)
331         ent_base = os.path.join(base, ent) if base else ent
332         if os.path.isdir(ent_path) and max_depth != 0:
333             allfiles += listdir_recursive(
334                 ent_path, base=ent_base,
335                 max_depth=(max_depth-1 if max_depth else None))
336         else:
337             allfiles += [ent_base]
338     return allfiles
339
340 def is_hex(s, *length_args):
341     """is_hex(s[, length[, max_length]]) -> boolean
342
343     Return True if s is a string of hexadecimal digits.
344     If one length argument is given, the string must contain exactly
345     that number of digits.
346     If two length arguments are given, the string must contain a number of
347     digits between those two lengths, inclusive.
348     Return False otherwise.
349     """
350     num_length_args = len(length_args)
351     if num_length_args > 2:
352         raise errors.ArgumentError("is_hex accepts up to 3 arguments ({} given)"
353                                    .format(1 + num_length_args))
354     elif num_length_args == 2:
355         good_len = (length_args[0] <= len(s) <= length_args[1])
356     elif num_length_args == 1:
357         good_len = (len(s) == length_args[0])
358     else:
359         good_len = True
360     return bool(good_len and HEX_RE.match(s))
361
362 def list_all(fn, num_retries=0, **kwargs):
363     items = []
364     offset = 0
365     items_available = sys.maxint
366     while len(items) < items_available:
367         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
368         items += c['items']
369         items_available = c['items_available']
370         offset = c['offset'] + len(c['items'])
371     return items