17776: Merge branch 'master' into 17776-more-throttling
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import hashlib
10 import httplib2
11 import os
12 import random
13 import re
14 import subprocess
15 import errno
16 import sys
17
18 import arvados
19 from arvados.collection import CollectionReader
20
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22 CR_UNCOMMITTED = 'Uncommitted'
23 CR_COMMITTED = 'Committed'
24 CR_FINAL = 'Final'
25
26 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
27 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
28 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
29 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
30 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
31 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
32 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
33 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
34 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
35 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
36 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
37
38 def clear_tmpdir(path=None):
39     """
40     Ensure the given directory (or TASK_TMPDIR if none given)
41     exists and is empty.
42     """
43     if path is None:
44         path = arvados.current_task().tmpdir
45     if os.path.exists(path):
46         p = subprocess.Popen(['rm', '-rf', path])
47         stdout, stderr = p.communicate(None)
48         if p.returncode != 0:
49             raise Exception('rm -rf %s: %s' % (path, stderr))
50     os.mkdir(path)
51
52 def run_command(execargs, **kwargs):
53     kwargs.setdefault('stdin', subprocess.PIPE)
54     kwargs.setdefault('stdout', subprocess.PIPE)
55     kwargs.setdefault('stderr', sys.stderr)
56     kwargs.setdefault('close_fds', True)
57     kwargs.setdefault('shell', False)
58     p = subprocess.Popen(execargs, **kwargs)
59     stdoutdata, stderrdata = p.communicate(None)
60     if p.returncode != 0:
61         raise arvados.errors.CommandFailedError(
62             "run_command %s exit %d:\n%s" %
63             (execargs, p.returncode, stderrdata))
64     return stdoutdata, stderrdata
65
66 def git_checkout(url, version, path):
67     if not re.search('^/', path):
68         path = os.path.join(arvados.current_job().tmpdir, path)
69     if not os.path.exists(path):
70         run_command(["git", "clone", url, path],
71                     cwd=os.path.dirname(path))
72     run_command(["git", "checkout", version],
73                 cwd=path)
74     return path
75
76 def tar_extractor(path, decompress_flag):
77     return subprocess.Popen(["tar",
78                              "-C", path,
79                              ("-x%sf" % decompress_flag),
80                              "-"],
81                             stdout=None,
82                             stdin=subprocess.PIPE, stderr=sys.stderr,
83                             shell=False, close_fds=True)
84
85 def tarball_extract(tarball, path):
86     """Retrieve a tarball from Keep and extract it to a local
87     directory.  Return the absolute path where the tarball was
88     extracted. If the top level of the tarball contained just one
89     file or directory, return the absolute path of that single
90     item.
91
92     tarball -- collection locator
93     path -- where to extract the tarball: absolute, or relative to job tmp
94     """
95     if not re.search('^/', path):
96         path = os.path.join(arvados.current_job().tmpdir, path)
97     lockfile = open(path + '.lock', 'w')
98     fcntl.flock(lockfile, fcntl.LOCK_EX)
99     try:
100         os.stat(path)
101     except OSError:
102         os.mkdir(path)
103     already_have_it = False
104     try:
105         if os.readlink(os.path.join(path, '.locator')) == tarball:
106             already_have_it = True
107     except OSError:
108         pass
109     if not already_have_it:
110
111         # emulate "rm -f" (i.e., if the file does not exist, we win)
112         try:
113             os.unlink(os.path.join(path, '.locator'))
114         except OSError:
115             if os.path.exists(os.path.join(path, '.locator')):
116                 os.unlink(os.path.join(path, '.locator'))
117
118         for f in CollectionReader(tarball).all_files():
119             if re.search('\.(tbz|tar.bz2)$', f.name()):
120                 p = tar_extractor(path, 'j')
121             elif re.search('\.(tgz|tar.gz)$', f.name()):
122                 p = tar_extractor(path, 'z')
123             elif re.search('\.tar$', f.name()):
124                 p = tar_extractor(path, '')
125             else:
126                 raise arvados.errors.AssertionError(
127                     "tarball_extract cannot handle filename %s" % f.name())
128             while True:
129                 buf = f.read(2**20)
130                 if len(buf) == 0:
131                     break
132                 p.stdin.write(buf)
133             p.stdin.close()
134             p.wait()
135             if p.returncode != 0:
136                 lockfile.close()
137                 raise arvados.errors.CommandFailedError(
138                     "tar exited %d" % p.returncode)
139         os.symlink(tarball, os.path.join(path, '.locator'))
140     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
141     lockfile.close()
142     if len(tld_extracts) == 1:
143         return os.path.join(path, tld_extracts[0])
144     return path
145
146 def zipball_extract(zipball, path):
147     """Retrieve a zip archive from Keep and extract it to a local
148     directory.  Return the absolute path where the archive was
149     extracted. If the top level of the archive contained just one
150     file or directory, return the absolute path of that single
151     item.
152
153     zipball -- collection locator
154     path -- where to extract the archive: absolute, or relative to job tmp
155     """
156     if not re.search('^/', path):
157         path = os.path.join(arvados.current_job().tmpdir, path)
158     lockfile = open(path + '.lock', 'w')
159     fcntl.flock(lockfile, fcntl.LOCK_EX)
160     try:
161         os.stat(path)
162     except OSError:
163         os.mkdir(path)
164     already_have_it = False
165     try:
166         if os.readlink(os.path.join(path, '.locator')) == zipball:
167             already_have_it = True
168     except OSError:
169         pass
170     if not already_have_it:
171
172         # emulate "rm -f" (i.e., if the file does not exist, we win)
173         try:
174             os.unlink(os.path.join(path, '.locator'))
175         except OSError:
176             if os.path.exists(os.path.join(path, '.locator')):
177                 os.unlink(os.path.join(path, '.locator'))
178
179         for f in CollectionReader(zipball).all_files():
180             if not re.search('\.zip$', f.name()):
181                 raise arvados.errors.NotImplementedError(
182                     "zipball_extract cannot handle filename %s" % f.name())
183             zip_filename = os.path.join(path, os.path.basename(f.name()))
184             zip_file = open(zip_filename, 'wb')
185             while True:
186                 buf = f.read(2**20)
187                 if len(buf) == 0:
188                     break
189                 zip_file.write(buf)
190             zip_file.close()
191
192             p = subprocess.Popen(["unzip",
193                                   "-q", "-o",
194                                   "-d", path,
195                                   zip_filename],
196                                  stdout=None,
197                                  stdin=None, stderr=sys.stderr,
198                                  shell=False, close_fds=True)
199             p.wait()
200             if p.returncode != 0:
201                 lockfile.close()
202                 raise arvados.errors.CommandFailedError(
203                     "unzip exited %d" % p.returncode)
204             os.unlink(zip_filename)
205         os.symlink(zipball, os.path.join(path, '.locator'))
206     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
207     lockfile.close()
208     if len(tld_extracts) == 1:
209         return os.path.join(path, tld_extracts[0])
210     return path
211
212 def collection_extract(collection, path, files=[], decompress=True):
213     """Retrieve a collection from Keep and extract it to a local
214     directory.  Return the absolute path where the collection was
215     extracted.
216
217     collection -- collection locator
218     path -- where to extract: absolute, or relative to job tmp
219     """
220     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
221     if matches:
222         collection_hash = matches.group(1)
223     else:
224         collection_hash = hashlib.md5(collection).hexdigest()
225     if not re.search('^/', path):
226         path = os.path.join(arvados.current_job().tmpdir, path)
227     lockfile = open(path + '.lock', 'w')
228     fcntl.flock(lockfile, fcntl.LOCK_EX)
229     try:
230         os.stat(path)
231     except OSError:
232         os.mkdir(path)
233     already_have_it = False
234     try:
235         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
236             already_have_it = True
237     except OSError:
238         pass
239
240     # emulate "rm -f" (i.e., if the file does not exist, we win)
241     try:
242         os.unlink(os.path.join(path, '.locator'))
243     except OSError:
244         if os.path.exists(os.path.join(path, '.locator')):
245             os.unlink(os.path.join(path, '.locator'))
246
247     files_got = []
248     for s in CollectionReader(collection).all_streams():
249         stream_name = s.name()
250         for f in s.all_files():
251             if (files == [] or
252                 ((f.name() not in files_got) and
253                  (f.name() in files or
254                   (decompress and f.decompressed_name() in files)))):
255                 outname = f.decompressed_name() if decompress else f.name()
256                 files_got += [outname]
257                 if os.path.exists(os.path.join(path, stream_name, outname)):
258                     continue
259                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
260                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
261                 for buf in (f.readall_decompressed() if decompress
262                             else f.readall()):
263                     outfile.write(buf)
264                 outfile.close()
265     if len(files_got) < len(files):
266         raise arvados.errors.AssertionError(
267             "Wanted files %s but only got %s from %s" %
268             (files, files_got,
269              [z.name() for z in CollectionReader(collection).all_files()]))
270     os.symlink(collection_hash, os.path.join(path, '.locator'))
271
272     lockfile.close()
273     return path
274
275 def mkdir_dash_p(path):
276     if not os.path.isdir(path):
277         try:
278             os.makedirs(path)
279         except OSError as e:
280             if e.errno == errno.EEXIST and os.path.isdir(path):
281                 # It is not an error if someone else creates the
282                 # directory between our exists() and makedirs() calls.
283                 pass
284             else:
285                 raise
286
287 def stream_extract(stream, path, files=[], decompress=True):
288     """Retrieve a stream from Keep and extract it to a local
289     directory.  Return the absolute path where the stream was
290     extracted.
291
292     stream -- StreamReader object
293     path -- where to extract: absolute, or relative to job tmp
294     """
295     if not re.search('^/', path):
296         path = os.path.join(arvados.current_job().tmpdir, path)
297     lockfile = open(path + '.lock', 'w')
298     fcntl.flock(lockfile, fcntl.LOCK_EX)
299     try:
300         os.stat(path)
301     except OSError:
302         os.mkdir(path)
303
304     files_got = []
305     for f in stream.all_files():
306         if (files == [] or
307             ((f.name() not in files_got) and
308              (f.name() in files or
309               (decompress and f.decompressed_name() in files)))):
310             outname = f.decompressed_name() if decompress else f.name()
311             files_got += [outname]
312             if os.path.exists(os.path.join(path, outname)):
313                 os.unlink(os.path.join(path, outname))
314             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
315             outfile = open(os.path.join(path, outname), 'wb')
316             for buf in (f.readall_decompressed() if decompress
317                         else f.readall()):
318                 outfile.write(buf)
319             outfile.close()
320     if len(files_got) < len(files):
321         raise arvados.errors.AssertionError(
322             "Wanted files %s but only got %s from %s" %
323             (files, files_got, [z.name() for z in stream.all_files()]))
324     lockfile.close()
325     return path
326
327 def listdir_recursive(dirname, base=None, max_depth=None):
328     """listdir_recursive(dirname, base, max_depth)
329
330     Return a list of file and directory names found under dirname.
331
332     If base is not None, prepend "{base}/" to each returned name.
333
334     If max_depth is None, descend into directories and return only the
335     names of files found in the directory tree.
336
337     If max_depth is a non-negative integer, stop descending into
338     directories at the given depth, and at that point return directory
339     names instead.
340
341     If max_depth==0 (and base is None) this is equivalent to
342     sorted(os.listdir(dirname)).
343     """
344     allfiles = []
345     for ent in sorted(os.listdir(dirname)):
346         ent_path = os.path.join(dirname, ent)
347         ent_base = os.path.join(base, ent) if base else ent
348         if os.path.isdir(ent_path) and max_depth != 0:
349             allfiles += listdir_recursive(
350                 ent_path, base=ent_base,
351                 max_depth=(max_depth-1 if max_depth else None))
352         else:
353             allfiles += [ent_base]
354     return allfiles
355
356 def is_hex(s, *length_args):
357     """is_hex(s[, length[, max_length]]) -> boolean
358
359     Return True if s is a string of hexadecimal digits.
360     If one length argument is given, the string must contain exactly
361     that number of digits.
362     If two length arguments are given, the string must contain a number of
363     digits between those two lengths, inclusive.
364     Return False otherwise.
365     """
366     num_length_args = len(length_args)
367     if num_length_args > 2:
368         raise arvados.errors.ArgumentError(
369             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
370     elif num_length_args == 2:
371         good_len = (length_args[0] <= len(s) <= length_args[1])
372     elif num_length_args == 1:
373         good_len = (len(s) == length_args[0])
374     else:
375         good_len = True
376     return bool(good_len and HEX_RE.match(s))
377
378 def list_all(fn, num_retries=0, **kwargs):
379     # Default limit to (effectively) api server's MAX_LIMIT
380     kwargs.setdefault('limit', sys.maxsize)
381     items = []
382     offset = 0
383     items_available = sys.maxsize
384     while len(items) < items_available:
385         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
386         items += c['items']
387         items_available = c['items_available']
388         offset = c['offset'] + len(c['items'])
389     return items
390
391 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
392     pagesize = 1000
393     kwargs["limit"] = pagesize
394     kwargs["count"] = 'none'
395     kwargs["order"] = ["%s %s" % (order_key, "asc" if ascending else "desc"), "uuid asc"]
396     other_filters = kwargs.get("filters", [])
397
398     if "select" in kwargs and "uuid" not in kwargs["select"]:
399         kwargs["select"].append("uuid")
400
401     nextpage = []
402     tot = 0
403     expect_full_page = True
404     seen_prevpage = set()
405     seen_thispage = set()
406     lastitem = None
407     prev_page_all_same_order_key = False
408
409     while True:
410         kwargs["filters"] = nextpage+other_filters
411         items = fn(**kwargs).execute(num_retries=num_retries)
412
413         if len(items["items"]) == 0:
414             if prev_page_all_same_order_key:
415                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
416                 prev_page_all_same_order_key = False
417                 continue
418             else:
419                 return
420
421         seen_prevpage = seen_thispage
422         seen_thispage = set()
423
424         for i in items["items"]:
425             # In cases where there's more than one record with the
426             # same order key, the result could include records we
427             # already saw in the last page.  Skip them.
428             if i["uuid"] in seen_prevpage:
429                 continue
430             seen_thispage.add(i["uuid"])
431             yield i
432
433         firstitem = items["items"][0]
434         lastitem = items["items"][-1]
435
436         if firstitem[order_key] == lastitem[order_key]:
437             # Got a page where every item has the same order key.
438             # Switch to using uuid for paging.
439             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">", lastitem["uuid"]]]
440             prev_page_all_same_order_key = True
441         else:
442             # Start from the last order key seen, but skip the last
443             # known uuid to avoid retrieving the same row twice.  If
444             # there are multiple rows with the same order key it is
445             # still likely we'll end up retrieving duplicate rows.
446             # That's handled by tracking the "seen" rows for each page
447             # so they can be skipped if they show up on the next page.
448             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
449             prev_page_all_same_order_key = False
450
451
452 def ca_certs_path(fallback=httplib2.CA_CERTS):
453     """Return the path of the best available CA certs source.
454
455     This function searches for various distribution sources of CA
456     certificates, and returns the first it finds.  If it doesn't find any,
457     it returns the value of `fallback` (httplib2's CA certs by default).
458     """
459     for ca_certs_path in [
460         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
461         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
462         os.environ.get('SSL_CERT_FILE'),
463         # Arvados specific:
464         '/etc/arvados/ca-certificates.crt',
465         # Debian:
466         '/etc/ssl/certs/ca-certificates.crt',
467         # Red Hat:
468         '/etc/pki/tls/certs/ca-bundle.crt',
469         ]:
470         if ca_certs_path and os.path.exists(ca_certs_path):
471             return ca_certs_path
472     return fallback
473
474 def new_request_id():
475     rid = "req-"
476     # 2**104 > 36**20 > 2**103
477     n = random.getrandbits(104)
478     for _ in range(20):
479         c = n % 36
480         if c < 10:
481             rid += chr(c+ord('0'))
482         else:
483             rid += chr(c+ord('a')-10)
484         n = n // 36
485     return rid
486
487 def get_config_once(svc):
488     if not svc._rootDesc.get('resources').get('configs', False):
489         # Old API server version, no config export endpoint
490         return {}
491     if not hasattr(svc, '_cached_config'):
492         svc._cached_config = svc.configs().get().execute()
493     return svc._cached_config