7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import fcntl
6 import hashlib
7 import httplib2
8 import os
9 import re
10 import subprocess
11 import errno
12 import sys
13
14 import arvados
15 from arvados.collection import CollectionReader
16
17 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
18
19 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
20 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
21 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
22 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
23 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
24 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
25 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
26 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
27 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
28 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
29 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
30
31 def clear_tmpdir(path=None):
32     """
33     Ensure the given directory (or TASK_TMPDIR if none given)
34     exists and is empty.
35     """
36     if path is None:
37         path = arvados.current_task().tmpdir
38     if os.path.exists(path):
39         p = subprocess.Popen(['rm', '-rf', path])
40         stdout, stderr = p.communicate(None)
41         if p.returncode != 0:
42             raise Exception('rm -rf %s: %s' % (path, stderr))
43     os.mkdir(path)
44
45 def run_command(execargs, **kwargs):
46     kwargs.setdefault('stdin', subprocess.PIPE)
47     kwargs.setdefault('stdout', subprocess.PIPE)
48     kwargs.setdefault('stderr', sys.stderr)
49     kwargs.setdefault('close_fds', True)
50     kwargs.setdefault('shell', False)
51     p = subprocess.Popen(execargs, **kwargs)
52     stdoutdata, stderrdata = p.communicate(None)
53     if p.returncode != 0:
54         raise arvados.errors.CommandFailedError(
55             "run_command %s exit %d:\n%s" %
56             (execargs, p.returncode, stderrdata))
57     return stdoutdata, stderrdata
58
59 def git_checkout(url, version, path):
60     if not re.search('^/', path):
61         path = os.path.join(arvados.current_job().tmpdir, path)
62     if not os.path.exists(path):
63         run_command(["git", "clone", url, path],
64                     cwd=os.path.dirname(path))
65     run_command(["git", "checkout", version],
66                 cwd=path)
67     return path
68
69 def tar_extractor(path, decompress_flag):
70     return subprocess.Popen(["tar",
71                              "-C", path,
72                              ("-x%sf" % decompress_flag),
73                              "-"],
74                             stdout=None,
75                             stdin=subprocess.PIPE, stderr=sys.stderr,
76                             shell=False, close_fds=True)
77
78 def tarball_extract(tarball, path):
79     """Retrieve a tarball from Keep and extract it to a local
80     directory.  Return the absolute path where the tarball was
81     extracted. If the top level of the tarball contained just one
82     file or directory, return the absolute path of that single
83     item.
84
85     tarball -- collection locator
86     path -- where to extract the tarball: absolute, or relative to job tmp
87     """
88     if not re.search('^/', path):
89         path = os.path.join(arvados.current_job().tmpdir, path)
90     lockfile = open(path + '.lock', 'w')
91     fcntl.flock(lockfile, fcntl.LOCK_EX)
92     try:
93         os.stat(path)
94     except OSError:
95         os.mkdir(path)
96     already_have_it = False
97     try:
98         if os.readlink(os.path.join(path, '.locator')) == tarball:
99             already_have_it = True
100     except OSError:
101         pass
102     if not already_have_it:
103
104         # emulate "rm -f" (i.e., if the file does not exist, we win)
105         try:
106             os.unlink(os.path.join(path, '.locator'))
107         except OSError:
108             if os.path.exists(os.path.join(path, '.locator')):
109                 os.unlink(os.path.join(path, '.locator'))
110
111         for f in CollectionReader(tarball).all_files():
112             if re.search('\.(tbz|tar.bz2)$', f.name()):
113                 p = tar_extractor(path, 'j')
114             elif re.search('\.(tgz|tar.gz)$', f.name()):
115                 p = tar_extractor(path, 'z')
116             elif re.search('\.tar$', f.name()):
117                 p = tar_extractor(path, '')
118             else:
119                 raise arvados.errors.AssertionError(
120                     "tarball_extract cannot handle filename %s" % f.name())
121             while True:
122                 buf = f.read(2**20)
123                 if len(buf) == 0:
124                     break
125                 p.stdin.write(buf)
126             p.stdin.close()
127             p.wait()
128             if p.returncode != 0:
129                 lockfile.close()
130                 raise arvados.errors.CommandFailedError(
131                     "tar exited %d" % p.returncode)
132         os.symlink(tarball, os.path.join(path, '.locator'))
133     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
134     lockfile.close()
135     if len(tld_extracts) == 1:
136         return os.path.join(path, tld_extracts[0])
137     return path
138
139 def zipball_extract(zipball, path):
140     """Retrieve a zip archive from Keep and extract it to a local
141     directory.  Return the absolute path where the archive was
142     extracted. If the top level of the archive contained just one
143     file or directory, return the absolute path of that single
144     item.
145
146     zipball -- collection locator
147     path -- where to extract the archive: absolute, or relative to job tmp
148     """
149     if not re.search('^/', path):
150         path = os.path.join(arvados.current_job().tmpdir, path)
151     lockfile = open(path + '.lock', 'w')
152     fcntl.flock(lockfile, fcntl.LOCK_EX)
153     try:
154         os.stat(path)
155     except OSError:
156         os.mkdir(path)
157     already_have_it = False
158     try:
159         if os.readlink(os.path.join(path, '.locator')) == zipball:
160             already_have_it = True
161     except OSError:
162         pass
163     if not already_have_it:
164
165         # emulate "rm -f" (i.e., if the file does not exist, we win)
166         try:
167             os.unlink(os.path.join(path, '.locator'))
168         except OSError:
169             if os.path.exists(os.path.join(path, '.locator')):
170                 os.unlink(os.path.join(path, '.locator'))
171
172         for f in CollectionReader(zipball).all_files():
173             if not re.search('\.zip$', f.name()):
174                 raise arvados.errors.NotImplementedError(
175                     "zipball_extract cannot handle filename %s" % f.name())
176             zip_filename = os.path.join(path, os.path.basename(f.name()))
177             zip_file = open(zip_filename, 'wb')
178             while True:
179                 buf = f.read(2**20)
180                 if len(buf) == 0:
181                     break
182                 zip_file.write(buf)
183             zip_file.close()
184
185             p = subprocess.Popen(["unzip",
186                                   "-q", "-o",
187                                   "-d", path,
188                                   zip_filename],
189                                  stdout=None,
190                                  stdin=None, stderr=sys.stderr,
191                                  shell=False, close_fds=True)
192             p.wait()
193             if p.returncode != 0:
194                 lockfile.close()
195                 raise arvados.errors.CommandFailedError(
196                     "unzip exited %d" % p.returncode)
197             os.unlink(zip_filename)
198         os.symlink(zipball, os.path.join(path, '.locator'))
199     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
200     lockfile.close()
201     if len(tld_extracts) == 1:
202         return os.path.join(path, tld_extracts[0])
203     return path
204
205 def collection_extract(collection, path, files=[], decompress=True):
206     """Retrieve a collection from Keep and extract it to a local
207     directory.  Return the absolute path where the collection was
208     extracted.
209
210     collection -- collection locator
211     path -- where to extract: absolute, or relative to job tmp
212     """
213     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
214     if matches:
215         collection_hash = matches.group(1)
216     else:
217         collection_hash = hashlib.md5(collection).hexdigest()
218     if not re.search('^/', path):
219         path = os.path.join(arvados.current_job().tmpdir, path)
220     lockfile = open(path + '.lock', 'w')
221     fcntl.flock(lockfile, fcntl.LOCK_EX)
222     try:
223         os.stat(path)
224     except OSError:
225         os.mkdir(path)
226     already_have_it = False
227     try:
228         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
229             already_have_it = True
230     except OSError:
231         pass
232
233     # emulate "rm -f" (i.e., if the file does not exist, we win)
234     try:
235         os.unlink(os.path.join(path, '.locator'))
236     except OSError:
237         if os.path.exists(os.path.join(path, '.locator')):
238             os.unlink(os.path.join(path, '.locator'))
239
240     files_got = []
241     for s in CollectionReader(collection).all_streams():
242         stream_name = s.name()
243         for f in s.all_files():
244             if (files == [] or
245                 ((f.name() not in files_got) and
246                  (f.name() in files or
247                   (decompress and f.decompressed_name() in files)))):
248                 outname = f.decompressed_name() if decompress else f.name()
249                 files_got += [outname]
250                 if os.path.exists(os.path.join(path, stream_name, outname)):
251                     continue
252                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
253                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
254                 for buf in (f.readall_decompressed() if decompress
255                             else f.readall()):
256                     outfile.write(buf)
257                 outfile.close()
258     if len(files_got) < len(files):
259         raise arvados.errors.AssertionError(
260             "Wanted files %s but only got %s from %s" %
261             (files, files_got,
262              [z.name() for z in CollectionReader(collection).all_files()]))
263     os.symlink(collection_hash, os.path.join(path, '.locator'))
264
265     lockfile.close()
266     return path
267
268 def mkdir_dash_p(path):
269     if not os.path.isdir(path):
270         try:
271             os.makedirs(path)
272         except OSError as e:
273             if e.errno == errno.EEXIST and os.path.isdir(path):
274                 # It is not an error if someone else creates the
275                 # directory between our exists() and makedirs() calls.
276                 pass
277             else:
278                 raise
279
280 def stream_extract(stream, path, files=[], decompress=True):
281     """Retrieve a stream from Keep and extract it to a local
282     directory.  Return the absolute path where the stream was
283     extracted.
284
285     stream -- StreamReader object
286     path -- where to extract: absolute, or relative to job tmp
287     """
288     if not re.search('^/', path):
289         path = os.path.join(arvados.current_job().tmpdir, path)
290     lockfile = open(path + '.lock', 'w')
291     fcntl.flock(lockfile, fcntl.LOCK_EX)
292     try:
293         os.stat(path)
294     except OSError:
295         os.mkdir(path)
296
297     files_got = []
298     for f in stream.all_files():
299         if (files == [] or
300             ((f.name() not in files_got) and
301              (f.name() in files or
302               (decompress and f.decompressed_name() in files)))):
303             outname = f.decompressed_name() if decompress else f.name()
304             files_got += [outname]
305             if os.path.exists(os.path.join(path, outname)):
306                 os.unlink(os.path.join(path, outname))
307             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
308             outfile = open(os.path.join(path, outname), 'wb')
309             for buf in (f.readall_decompressed() if decompress
310                         else f.readall()):
311                 outfile.write(buf)
312             outfile.close()
313     if len(files_got) < len(files):
314         raise arvados.errors.AssertionError(
315             "Wanted files %s but only got %s from %s" %
316             (files, files_got, [z.name() for z in stream.all_files()]))
317     lockfile.close()
318     return path
319
320 def listdir_recursive(dirname, base=None, max_depth=None):
321     """listdir_recursive(dirname, base, max_depth)
322
323     Return a list of file and directory names found under dirname.
324
325     If base is not None, prepend "{base}/" to each returned name.
326
327     If max_depth is None, descend into directories and return only the
328     names of files found in the directory tree.
329
330     If max_depth is a non-negative integer, stop descending into
331     directories at the given depth, and at that point return directory
332     names instead.
333
334     If max_depth==0 (and base is None) this is equivalent to
335     sorted(os.listdir(dirname)).
336     """
337     allfiles = []
338     for ent in sorted(os.listdir(dirname)):
339         ent_path = os.path.join(dirname, ent)
340         ent_base = os.path.join(base, ent) if base else ent
341         if os.path.isdir(ent_path) and max_depth != 0:
342             allfiles += listdir_recursive(
343                 ent_path, base=ent_base,
344                 max_depth=(max_depth-1 if max_depth else None))
345         else:
346             allfiles += [ent_base]
347     return allfiles
348
349 def is_hex(s, *length_args):
350     """is_hex(s[, length[, max_length]]) -> boolean
351
352     Return True if s is a string of hexadecimal digits.
353     If one length argument is given, the string must contain exactly
354     that number of digits.
355     If two length arguments are given, the string must contain a number of
356     digits between those two lengths, inclusive.
357     Return False otherwise.
358     """
359     num_length_args = len(length_args)
360     if num_length_args > 2:
361         raise arvados.errors.ArgumentError(
362             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
363     elif num_length_args == 2:
364         good_len = (length_args[0] <= len(s) <= length_args[1])
365     elif num_length_args == 1:
366         good_len = (len(s) == length_args[0])
367     else:
368         good_len = True
369     return bool(good_len and HEX_RE.match(s))
370
371 def list_all(fn, num_retries=0, **kwargs):
372     # Default limit to (effectively) api server's MAX_LIMIT
373     kwargs.setdefault('limit', sys.maxsize)
374     items = []
375     offset = 0
376     items_available = sys.maxsize
377     while len(items) < items_available:
378         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
379         items += c['items']
380         items_available = c['items_available']
381         offset = c['offset'] + len(c['items'])
382     return items
383
384 def ca_certs_path(fallback=httplib2.CA_CERTS):
385     """Return the path of the best available CA certs source.
386
387     This function searches for various distribution sources of CA
388     certificates, and returns the first it finds.  If it doesn't find any,
389     it returns the value of `fallback` (httplib2's CA certs by default).
390     """
391     for ca_certs_path in [
392         # Arvados specific:
393         '/etc/arvados/ca-certificates.crt',
394         # Debian:
395         '/etc/ssl/certs/ca-certificates.crt',
396         # Red Hat:
397         '/etc/pki/tls/certs/ca-bundle.crt',
398         ]:
399         if os.path.exists(ca_certs_path):
400             return ca_certs_path
401     return fallback