3706: Consolidate more regular expressions into util package.
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
14 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
15 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
16 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
17 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
18 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
19 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
20 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
21
22 def clear_tmpdir(path=None):
23     """
24     Ensure the given directory (or TASK_TMPDIR if none given)
25     exists and is empty.
26     """
27     if path == None:
28         path = arvados.current_task().tmpdir
29     if os.path.exists(path):
30         p = subprocess.Popen(['rm', '-rf', path])
31         stdout, stderr = p.communicate(None)
32         if p.returncode != 0:
33             raise Exception('rm -rf %s: %s' % (path, stderr))
34     os.mkdir(path)
35
36 def run_command(execargs, **kwargs):
37     kwargs.setdefault('stdin', subprocess.PIPE)
38     kwargs.setdefault('stdout', subprocess.PIPE)
39     kwargs.setdefault('stderr', sys.stderr)
40     kwargs.setdefault('close_fds', True)
41     kwargs.setdefault('shell', False)
42     p = subprocess.Popen(execargs, **kwargs)
43     stdoutdata, stderrdata = p.communicate(None)
44     if p.returncode != 0:
45         raise errors.CommandFailedError(
46             "run_command %s exit %d:\n%s" %
47             (execargs, p.returncode, stderrdata))
48     return stdoutdata, stderrdata
49
50 def git_checkout(url, version, path):
51     if not re.search('^/', path):
52         path = os.path.join(arvados.current_job().tmpdir, path)
53     if not os.path.exists(path):
54         run_command(["git", "clone", url, path],
55                     cwd=os.path.dirname(path))
56     run_command(["git", "checkout", version],
57                 cwd=path)
58     return path
59
60 def tar_extractor(path, decompress_flag):
61     return subprocess.Popen(["tar",
62                              "-C", path,
63                              ("-x%sf" % decompress_flag),
64                              "-"],
65                             stdout=None,
66                             stdin=subprocess.PIPE, stderr=sys.stderr,
67                             shell=False, close_fds=True)
68
69 def tarball_extract(tarball, path):
70     """Retrieve a tarball from Keep and extract it to a local
71     directory.  Return the absolute path where the tarball was
72     extracted. If the top level of the tarball contained just one
73     file or directory, return the absolute path of that single
74     item.
75
76     tarball -- collection locator
77     path -- where to extract the tarball: absolute, or relative to job tmp
78     """
79     if not re.search('^/', path):
80         path = os.path.join(arvados.current_job().tmpdir, path)
81     lockfile = open(path + '.lock', 'w')
82     fcntl.flock(lockfile, fcntl.LOCK_EX)
83     try:
84         os.stat(path)
85     except OSError:
86         os.mkdir(path)
87     already_have_it = False
88     try:
89         if os.readlink(os.path.join(path, '.locator')) == tarball:
90             already_have_it = True
91     except OSError:
92         pass
93     if not already_have_it:
94
95         # emulate "rm -f" (i.e., if the file does not exist, we win)
96         try:
97             os.unlink(os.path.join(path, '.locator'))
98         except OSError:
99             if os.path.exists(os.path.join(path, '.locator')):
100                 os.unlink(os.path.join(path, '.locator'))
101
102         for f in CollectionReader(tarball).all_files():
103             if re.search('\.(tbz|tar.bz2)$', f.name()):
104                 p = tar_extractor(path, 'j')
105             elif re.search('\.(tgz|tar.gz)$', f.name()):
106                 p = tar_extractor(path, 'z')
107             elif re.search('\.tar$', f.name()):
108                 p = tar_extractor(path, '')
109             else:
110                 raise errors.AssertionError(
111                     "tarball_extract cannot handle filename %s" % f.name())
112             while True:
113                 buf = f.read(2**20)
114                 if len(buf) == 0:
115                     break
116                 p.stdin.write(buf)
117             p.stdin.close()
118             p.wait()
119             if p.returncode != 0:
120                 lockfile.close()
121                 raise errors.CommandFailedError(
122                     "tar exited %d" % p.returncode)
123         os.symlink(tarball, os.path.join(path, '.locator'))
124     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
125     lockfile.close()
126     if len(tld_extracts) == 1:
127         return os.path.join(path, tld_extracts[0])
128     return path
129
130 def zipball_extract(zipball, path):
131     """Retrieve a zip archive from Keep and extract it to a local
132     directory.  Return the absolute path where the archive was
133     extracted. If the top level of the archive contained just one
134     file or directory, return the absolute path of that single
135     item.
136
137     zipball -- collection locator
138     path -- where to extract the archive: absolute, or relative to job tmp
139     """
140     if not re.search('^/', path):
141         path = os.path.join(arvados.current_job().tmpdir, path)
142     lockfile = open(path + '.lock', 'w')
143     fcntl.flock(lockfile, fcntl.LOCK_EX)
144     try:
145         os.stat(path)
146     except OSError:
147         os.mkdir(path)
148     already_have_it = False
149     try:
150         if os.readlink(os.path.join(path, '.locator')) == zipball:
151             already_have_it = True
152     except OSError:
153         pass
154     if not already_have_it:
155
156         # emulate "rm -f" (i.e., if the file does not exist, we win)
157         try:
158             os.unlink(os.path.join(path, '.locator'))
159         except OSError:
160             if os.path.exists(os.path.join(path, '.locator')):
161                 os.unlink(os.path.join(path, '.locator'))
162
163         for f in CollectionReader(zipball).all_files():
164             if not re.search('\.zip$', f.name()):
165                 raise errors.NotImplementedError(
166                     "zipball_extract cannot handle filename %s" % f.name())
167             zip_filename = os.path.join(path, os.path.basename(f.name()))
168             zip_file = open(zip_filename, 'wb')
169             while True:
170                 buf = f.read(2**20)
171                 if len(buf) == 0:
172                     break
173                 zip_file.write(buf)
174             zip_file.close()
175
176             p = subprocess.Popen(["unzip",
177                                   "-q", "-o",
178                                   "-d", path,
179                                   zip_filename],
180                                  stdout=None,
181                                  stdin=None, stderr=sys.stderr,
182                                  shell=False, close_fds=True)
183             p.wait()
184             if p.returncode != 0:
185                 lockfile.close()
186                 raise errors.CommandFailedError(
187                     "unzip exited %d" % p.returncode)
188             os.unlink(zip_filename)
189         os.symlink(zipball, os.path.join(path, '.locator'))
190     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
191     lockfile.close()
192     if len(tld_extracts) == 1:
193         return os.path.join(path, tld_extracts[0])
194     return path
195
196 def collection_extract(collection, path, files=[], decompress=True):
197     """Retrieve a collection from Keep and extract it to a local
198     directory.  Return the absolute path where the collection was
199     extracted.
200
201     collection -- collection locator
202     path -- where to extract: absolute, or relative to job tmp
203     """
204     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
205     if matches:
206         collection_hash = matches.group(1)
207     else:
208         collection_hash = hashlib.md5(collection).hexdigest()
209     if not re.search('^/', path):
210         path = os.path.join(arvados.current_job().tmpdir, path)
211     lockfile = open(path + '.lock', 'w')
212     fcntl.flock(lockfile, fcntl.LOCK_EX)
213     try:
214         os.stat(path)
215     except OSError:
216         os.mkdir(path)
217     already_have_it = False
218     try:
219         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
220             already_have_it = True
221     except OSError:
222         pass
223
224     # emulate "rm -f" (i.e., if the file does not exist, we win)
225     try:
226         os.unlink(os.path.join(path, '.locator'))
227     except OSError:
228         if os.path.exists(os.path.join(path, '.locator')):
229             os.unlink(os.path.join(path, '.locator'))
230
231     files_got = []
232     for s in CollectionReader(collection).all_streams():
233         stream_name = s.name()
234         for f in s.all_files():
235             if (files == [] or
236                 ((f.name() not in files_got) and
237                  (f.name() in files or
238                   (decompress and f.decompressed_name() in files)))):
239                 outname = f.decompressed_name() if decompress else f.name()
240                 files_got += [outname]
241                 if os.path.exists(os.path.join(path, stream_name, outname)):
242                     continue
243                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
244                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
245                 for buf in (f.readall_decompressed() if decompress
246                             else f.readall()):
247                     outfile.write(buf)
248                 outfile.close()
249     if len(files_got) < len(files):
250         raise errors.AssertionError(
251             "Wanted files %s but only got %s from %s" %
252             (files, files_got,
253              [z.name() for z in CollectionReader(collection).all_files()]))
254     os.symlink(collection_hash, os.path.join(path, '.locator'))
255
256     lockfile.close()
257     return path
258
259 def mkdir_dash_p(path):
260     if not os.path.isdir(path):
261         try:
262             os.makedirs(path)
263         except OSError as e:
264             if e.errno == errno.EEXIST and os.path.isdir(path):
265                 # It is not an error if someone else creates the
266                 # directory between our exists() and makedirs() calls.
267                 pass
268             else:
269                 raise
270
271 def stream_extract(stream, path, files=[], decompress=True):
272     """Retrieve a stream from Keep and extract it to a local
273     directory.  Return the absolute path where the stream was
274     extracted.
275
276     stream -- StreamReader object
277     path -- where to extract: absolute, or relative to job tmp
278     """
279     if not re.search('^/', path):
280         path = os.path.join(arvados.current_job().tmpdir, path)
281     lockfile = open(path + '.lock', 'w')
282     fcntl.flock(lockfile, fcntl.LOCK_EX)
283     try:
284         os.stat(path)
285     except OSError:
286         os.mkdir(path)
287
288     files_got = []
289     for f in stream.all_files():
290         if (files == [] or
291             ((f.name() not in files_got) and
292              (f.name() in files or
293               (decompress and f.decompressed_name() in files)))):
294             outname = f.decompressed_name() if decompress else f.name()
295             files_got += [outname]
296             if os.path.exists(os.path.join(path, outname)):
297                 os.unlink(os.path.join(path, outname))
298             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
299             outfile = open(os.path.join(path, outname), 'wb')
300             for buf in (f.readall_decompressed() if decompress
301                         else f.readall()):
302                 outfile.write(buf)
303             outfile.close()
304     if len(files_got) < len(files):
305         raise errors.AssertionError(
306             "Wanted files %s but only got %s from %s" %
307             (files, files_got, [z.name() for z in stream.all_files()]))
308     lockfile.close()
309     return path
310
311 def listdir_recursive(dirname, base=None):
312     allfiles = []
313     for ent in sorted(os.listdir(dirname)):
314         ent_path = os.path.join(dirname, ent)
315         ent_base = os.path.join(base, ent) if base else ent
316         if os.path.isdir(ent_path):
317             allfiles += listdir_recursive(ent_path, ent_base)
318         else:
319             allfiles += [ent_base]
320     return allfiles
321
322 def is_hex(s, *length_args):
323     """is_hex(s[, length[, max_length]]) -> boolean
324
325     Return True if s is a string of hexadecimal digits.
326     If one length argument is given, the string must contain exactly
327     that number of digits.
328     If two length arguments are given, the string must contain a number of
329     digits between those two lengths, inclusive.
330     Return False otherwise.
331     """
332     num_length_args = len(length_args)
333     if num_length_args > 2:
334         raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
335                             format(1 + num_length_args))
336     elif num_length_args == 2:
337         good_len = (length_args[0] <= len(s) <= length_args[1])
338     elif num_length_args == 1:
339         good_len = (len(s) == length_args[0])
340     else:
341         good_len = True
342     return bool(good_len and HEX_RE.match(s))
343
344 def list_all(fn, num_retries=0, **kwargs):
345     items = []
346     offset = 0
347     items_available = sys.maxint
348     while len(items) < items_available:
349         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
350         items += c['items']
351         items_available = c['items_available']
352         offset = c['offset'] + len(c['items'])
353     return items