Merge branch '20612-diag-ctr-api-access'
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import functools
10 import hashlib
11 import httplib2
12 import os
13 import random
14 import re
15 import subprocess
16 import errno
17 import sys
18 import warnings
19
20 import arvados.errors
21
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
25 CR_FINAL = 'Final'
26
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
38
39 def _deprecated(version=None, preferred=None):
40     """Mark a callable as deprecated in the SDK
41
42     This will wrap the callable to emit as a DeprecationWarning
43     and add a deprecation notice to its docstring.
44
45     If the following arguments are given, they'll be included in the
46     notices:
47
48     preferred: str | None
49     : The name of an alternative that users should use instead.
50
51     version: str | None
52     : The version of Arvados when the callable is scheduled to be
53       removed.
54     """
55     if version is None:
56         version = ''
57     else:
58         version = f' and scheduled to be removed in Arvados {version}'
59     if preferred is None:
60         preferred = ''
61     else:
62         preferred = f' Prefer {preferred} instead.'
63     def deprecated_decorator(func):
64         fullname = f'{func.__module__}.{func.__qualname__}'
65         parent, _, name = fullname.rpartition('.')
66         if name == '__init__':
67             fullname = parent
68         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
69         @functools.wraps(func)
70         def deprecated_wrapper(*args, **kwargs):
71             warnings.warn(warning_msg, DeprecationWarning, 2)
72             return func(*args, **kwargs)
73         # Get func's docstring without any trailing newline or empty lines.
74         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
75         match = re.search(r'\n([ \t]+)\S', func_doc)
76         indent = '' if match is None else match.group(1)
77         # Make the deprecation notice the second "paragraph" of the
78         # docstring if possible. Otherwise append it.
79         docstring, count = re.subn(
80             rf'\n[ \t]*\n{indent}',
81             f'\n\n{indent}DEPRECATED: {warning_msg}\n\n{indent}',
82             func_doc,
83             count=1,
84         )
85         if not count:
86             docstring = f'{func_doc}\n\n{indent}DEPRECATED: {warning_msg}'.lstrip()
87         deprecated_wrapper.__doc__ = docstring
88         return deprecated_wrapper
89     return deprecated_decorator
90
91 @_deprecated('3.0')
92 def clear_tmpdir(path=None):
93     """
94     Ensure the given directory (or TASK_TMPDIR if none given)
95     exists and is empty.
96     """
97     from arvados import current_task
98     if path is None:
99         path = current_task().tmpdir
100     if os.path.exists(path):
101         p = subprocess.Popen(['rm', '-rf', path])
102         stdout, stderr = p.communicate(None)
103         if p.returncode != 0:
104             raise Exception('rm -rf %s: %s' % (path, stderr))
105     os.mkdir(path)
106
107 @_deprecated('3.0', 'subprocess.run')
108 def run_command(execargs, **kwargs):
109     kwargs.setdefault('stdin', subprocess.PIPE)
110     kwargs.setdefault('stdout', subprocess.PIPE)
111     kwargs.setdefault('stderr', sys.stderr)
112     kwargs.setdefault('close_fds', True)
113     kwargs.setdefault('shell', False)
114     p = subprocess.Popen(execargs, **kwargs)
115     stdoutdata, stderrdata = p.communicate(None)
116     if p.returncode != 0:
117         raise arvados.errors.CommandFailedError(
118             "run_command %s exit %d:\n%s" %
119             (execargs, p.returncode, stderrdata))
120     return stdoutdata, stderrdata
121
122 @_deprecated('3.0')
123 def git_checkout(url, version, path):
124     from arvados import current_job
125     if not re.search('^/', path):
126         path = os.path.join(current_job().tmpdir, path)
127     if not os.path.exists(path):
128         run_command(["git", "clone", url, path],
129                     cwd=os.path.dirname(path))
130     run_command(["git", "checkout", version],
131                 cwd=path)
132     return path
133
134 @_deprecated('3.0')
135 def tar_extractor(path, decompress_flag):
136     return subprocess.Popen(["tar",
137                              "-C", path,
138                              ("-x%sf" % decompress_flag),
139                              "-"],
140                             stdout=None,
141                             stdin=subprocess.PIPE, stderr=sys.stderr,
142                             shell=False, close_fds=True)
143
144 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
145 def tarball_extract(tarball, path):
146     """Retrieve a tarball from Keep and extract it to a local
147     directory.  Return the absolute path where the tarball was
148     extracted. If the top level of the tarball contained just one
149     file or directory, return the absolute path of that single
150     item.
151
152     tarball -- collection locator
153     path -- where to extract the tarball: absolute, or relative to job tmp
154     """
155     from arvados import current_job
156     from arvados.collection import CollectionReader
157     if not re.search('^/', path):
158         path = os.path.join(current_job().tmpdir, path)
159     lockfile = open(path + '.lock', 'w')
160     fcntl.flock(lockfile, fcntl.LOCK_EX)
161     try:
162         os.stat(path)
163     except OSError:
164         os.mkdir(path)
165     already_have_it = False
166     try:
167         if os.readlink(os.path.join(path, '.locator')) == tarball:
168             already_have_it = True
169     except OSError:
170         pass
171     if not already_have_it:
172
173         # emulate "rm -f" (i.e., if the file does not exist, we win)
174         try:
175             os.unlink(os.path.join(path, '.locator'))
176         except OSError:
177             if os.path.exists(os.path.join(path, '.locator')):
178                 os.unlink(os.path.join(path, '.locator'))
179
180         for f in CollectionReader(tarball).all_files():
181             f_name = f.name()
182             if f_name.endswith(('.tbz', '.tar.bz2')):
183                 p = tar_extractor(path, 'j')
184             elif f_name.endswith(('.tgz', '.tar.gz')):
185                 p = tar_extractor(path, 'z')
186             elif f_name.endswith('.tar'):
187                 p = tar_extractor(path, '')
188             else:
189                 raise arvados.errors.AssertionError(
190                     "tarball_extract cannot handle filename %s" % f.name())
191             while True:
192                 buf = f.read(2**20)
193                 if len(buf) == 0:
194                     break
195                 p.stdin.write(buf)
196             p.stdin.close()
197             p.wait()
198             if p.returncode != 0:
199                 lockfile.close()
200                 raise arvados.errors.CommandFailedError(
201                     "tar exited %d" % p.returncode)
202         os.symlink(tarball, os.path.join(path, '.locator'))
203     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
204     lockfile.close()
205     if len(tld_extracts) == 1:
206         return os.path.join(path, tld_extracts[0])
207     return path
208
209 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
210 def zipball_extract(zipball, path):
211     """Retrieve a zip archive from Keep and extract it to a local
212     directory.  Return the absolute path where the archive was
213     extracted. If the top level of the archive contained just one
214     file or directory, return the absolute path of that single
215     item.
216
217     zipball -- collection locator
218     path -- where to extract the archive: absolute, or relative to job tmp
219     """
220     from arvados import current_job
221     from arvados.collection import CollectionReader
222     if not re.search('^/', path):
223         path = os.path.join(current_job().tmpdir, path)
224     lockfile = open(path + '.lock', 'w')
225     fcntl.flock(lockfile, fcntl.LOCK_EX)
226     try:
227         os.stat(path)
228     except OSError:
229         os.mkdir(path)
230     already_have_it = False
231     try:
232         if os.readlink(os.path.join(path, '.locator')) == zipball:
233             already_have_it = True
234     except OSError:
235         pass
236     if not already_have_it:
237
238         # emulate "rm -f" (i.e., if the file does not exist, we win)
239         try:
240             os.unlink(os.path.join(path, '.locator'))
241         except OSError:
242             if os.path.exists(os.path.join(path, '.locator')):
243                 os.unlink(os.path.join(path, '.locator'))
244
245         for f in CollectionReader(zipball).all_files():
246             if not f.name().endswith('.zip'):
247                 raise arvados.errors.NotImplementedError(
248                     "zipball_extract cannot handle filename %s" % f.name())
249             zip_filename = os.path.join(path, os.path.basename(f.name()))
250             zip_file = open(zip_filename, 'wb')
251             while True:
252                 buf = f.read(2**20)
253                 if len(buf) == 0:
254                     break
255                 zip_file.write(buf)
256             zip_file.close()
257
258             p = subprocess.Popen(["unzip",
259                                   "-q", "-o",
260                                   "-d", path,
261                                   zip_filename],
262                                  stdout=None,
263                                  stdin=None, stderr=sys.stderr,
264                                  shell=False, close_fds=True)
265             p.wait()
266             if p.returncode != 0:
267                 lockfile.close()
268                 raise arvados.errors.CommandFailedError(
269                     "unzip exited %d" % p.returncode)
270             os.unlink(zip_filename)
271         os.symlink(zipball, os.path.join(path, '.locator'))
272     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
273     lockfile.close()
274     if len(tld_extracts) == 1:
275         return os.path.join(path, tld_extracts[0])
276     return path
277
278 @_deprecated('3.0', 'arvados.collection.Collection')
279 def collection_extract(collection, path, files=[], decompress=True):
280     """Retrieve a collection from Keep and extract it to a local
281     directory.  Return the absolute path where the collection was
282     extracted.
283
284     collection -- collection locator
285     path -- where to extract: absolute, or relative to job tmp
286     """
287     from arvados import current_job
288     from arvados.collection import CollectionReader
289     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
290     if matches:
291         collection_hash = matches.group(1)
292     else:
293         collection_hash = hashlib.md5(collection).hexdigest()
294     if not re.search('^/', path):
295         path = os.path.join(current_job().tmpdir, path)
296     lockfile = open(path + '.lock', 'w')
297     fcntl.flock(lockfile, fcntl.LOCK_EX)
298     try:
299         os.stat(path)
300     except OSError:
301         os.mkdir(path)
302     already_have_it = False
303     try:
304         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
305             already_have_it = True
306     except OSError:
307         pass
308
309     # emulate "rm -f" (i.e., if the file does not exist, we win)
310     try:
311         os.unlink(os.path.join(path, '.locator'))
312     except OSError:
313         if os.path.exists(os.path.join(path, '.locator')):
314             os.unlink(os.path.join(path, '.locator'))
315
316     files_got = []
317     for s in CollectionReader(collection).all_streams():
318         stream_name = s.name()
319         for f in s.all_files():
320             if (files == [] or
321                 ((f.name() not in files_got) and
322                  (f.name() in files or
323                   (decompress and f.decompressed_name() in files)))):
324                 outname = f.decompressed_name() if decompress else f.name()
325                 files_got += [outname]
326                 if os.path.exists(os.path.join(path, stream_name, outname)):
327                     continue
328                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
329                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
330                 for buf in (f.readall_decompressed() if decompress
331                             else f.readall()):
332                     outfile.write(buf)
333                 outfile.close()
334     if len(files_got) < len(files):
335         raise arvados.errors.AssertionError(
336             "Wanted files %s but only got %s from %s" %
337             (files, files_got,
338              [z.name() for z in CollectionReader(collection).all_files()]))
339     os.symlink(collection_hash, os.path.join(path, '.locator'))
340
341     lockfile.close()
342     return path
343
344 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
345 def mkdir_dash_p(path):
346     if not os.path.isdir(path):
347         try:
348             os.makedirs(path)
349         except OSError as e:
350             if e.errno == errno.EEXIST and os.path.isdir(path):
351                 # It is not an error if someone else creates the
352                 # directory between our exists() and makedirs() calls.
353                 pass
354             else:
355                 raise
356
357 @_deprecated('3.0', 'arvados.collection.Collection')
358 def stream_extract(stream, path, files=[], decompress=True):
359     """Retrieve a stream from Keep and extract it to a local
360     directory.  Return the absolute path where the stream was
361     extracted.
362
363     stream -- StreamReader object
364     path -- where to extract: absolute, or relative to job tmp
365     """
366     from arvados import current_job
367     if not re.search('^/', path):
368         path = os.path.join(current_job().tmpdir, path)
369     lockfile = open(path + '.lock', 'w')
370     fcntl.flock(lockfile, fcntl.LOCK_EX)
371     try:
372         os.stat(path)
373     except OSError:
374         os.mkdir(path)
375
376     files_got = []
377     for f in stream.all_files():
378         if (files == [] or
379             ((f.name() not in files_got) and
380              (f.name() in files or
381               (decompress and f.decompressed_name() in files)))):
382             outname = f.decompressed_name() if decompress else f.name()
383             files_got += [outname]
384             if os.path.exists(os.path.join(path, outname)):
385                 os.unlink(os.path.join(path, outname))
386             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
387             outfile = open(os.path.join(path, outname), 'wb')
388             for buf in (f.readall_decompressed() if decompress
389                         else f.readall()):
390                 outfile.write(buf)
391             outfile.close()
392     if len(files_got) < len(files):
393         raise arvados.errors.AssertionError(
394             "Wanted files %s but only got %s from %s" %
395             (files, files_got, [z.name() for z in stream.all_files()]))
396     lockfile.close()
397     return path
398
399 @_deprecated('3.0', 'os.walk')
400 def listdir_recursive(dirname, base=None, max_depth=None):
401     """listdir_recursive(dirname, base, max_depth)
402
403     Return a list of file and directory names found under dirname.
404
405     If base is not None, prepend "{base}/" to each returned name.
406
407     If max_depth is None, descend into directories and return only the
408     names of files found in the directory tree.
409
410     If max_depth is a non-negative integer, stop descending into
411     directories at the given depth, and at that point return directory
412     names instead.
413
414     If max_depth==0 (and base is None) this is equivalent to
415     sorted(os.listdir(dirname)).
416     """
417     allfiles = []
418     for ent in sorted(os.listdir(dirname)):
419         ent_path = os.path.join(dirname, ent)
420         ent_base = os.path.join(base, ent) if base else ent
421         if os.path.isdir(ent_path) and max_depth != 0:
422             allfiles += listdir_recursive(
423                 ent_path, base=ent_base,
424                 max_depth=(max_depth-1 if max_depth else None))
425         else:
426             allfiles += [ent_base]
427     return allfiles
428
429 def is_hex(s, *length_args):
430     """is_hex(s[, length[, max_length]]) -> boolean
431
432     Return True if s is a string of hexadecimal digits.
433     If one length argument is given, the string must contain exactly
434     that number of digits.
435     If two length arguments are given, the string must contain a number of
436     digits between those two lengths, inclusive.
437     Return False otherwise.
438     """
439     num_length_args = len(length_args)
440     if num_length_args > 2:
441         raise arvados.errors.ArgumentError(
442             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
443     elif num_length_args == 2:
444         good_len = (length_args[0] <= len(s) <= length_args[1])
445     elif num_length_args == 1:
446         good_len = (len(s) == length_args[0])
447     else:
448         good_len = True
449     return bool(good_len and HEX_RE.match(s))
450
451 @_deprecated('3.0', 'arvados.util.keyset_list_all')
452 def list_all(fn, num_retries=0, **kwargs):
453     # Default limit to (effectively) api server's MAX_LIMIT
454     kwargs.setdefault('limit', sys.maxsize)
455     items = []
456     offset = 0
457     items_available = sys.maxsize
458     while len(items) < items_available:
459         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
460         items += c['items']
461         items_available = c['items_available']
462         offset = c['offset'] + len(c['items'])
463     return items
464
465 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
466     pagesize = 1000
467     kwargs["limit"] = pagesize
468     kwargs["count"] = 'none'
469     asc = "asc" if ascending else "desc"
470     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
471     other_filters = kwargs.get("filters", [])
472
473     try:
474         select = set(kwargs['select'])
475     except KeyError:
476         pass
477     else:
478         select.add(order_key)
479         select.add('uuid')
480         kwargs['select'] = list(select)
481
482     nextpage = []
483     tot = 0
484     expect_full_page = True
485     seen_prevpage = set()
486     seen_thispage = set()
487     lastitem = None
488     prev_page_all_same_order_key = False
489
490     while True:
491         kwargs["filters"] = nextpage+other_filters
492         items = fn(**kwargs).execute(num_retries=num_retries)
493
494         if len(items["items"]) == 0:
495             if prev_page_all_same_order_key:
496                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
497                 prev_page_all_same_order_key = False
498                 continue
499             else:
500                 return
501
502         seen_prevpage = seen_thispage
503         seen_thispage = set()
504
505         for i in items["items"]:
506             # In cases where there's more than one record with the
507             # same order key, the result could include records we
508             # already saw in the last page.  Skip them.
509             if i["uuid"] in seen_prevpage:
510                 continue
511             seen_thispage.add(i["uuid"])
512             yield i
513
514         firstitem = items["items"][0]
515         lastitem = items["items"][-1]
516
517         if firstitem[order_key] == lastitem[order_key]:
518             # Got a page where every item has the same order key.
519             # Switch to using uuid for paging.
520             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
521             prev_page_all_same_order_key = True
522         else:
523             # Start from the last order key seen, but skip the last
524             # known uuid to avoid retrieving the same row twice.  If
525             # there are multiple rows with the same order key it is
526             # still likely we'll end up retrieving duplicate rows.
527             # That's handled by tracking the "seen" rows for each page
528             # so they can be skipped if they show up on the next page.
529             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
530             prev_page_all_same_order_key = False
531
532 def ca_certs_path(fallback=httplib2.CA_CERTS):
533     """Return the path of the best available CA certs source.
534
535     This function searches for various distribution sources of CA
536     certificates, and returns the first it finds.  If it doesn't find any,
537     it returns the value of `fallback` (httplib2's CA certs by default).
538     """
539     for ca_certs_path in [
540         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
541         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
542         os.environ.get('SSL_CERT_FILE'),
543         # Arvados specific:
544         '/etc/arvados/ca-certificates.crt',
545         # Debian:
546         '/etc/ssl/certs/ca-certificates.crt',
547         # Red Hat:
548         '/etc/pki/tls/certs/ca-bundle.crt',
549         ]:
550         if ca_certs_path and os.path.exists(ca_certs_path):
551             return ca_certs_path
552     return fallback
553
554 def new_request_id():
555     rid = "req-"
556     # 2**104 > 36**20 > 2**103
557     n = random.getrandbits(104)
558     for _ in range(20):
559         c = n % 36
560         if c < 10:
561             rid += chr(c+ord('0'))
562         else:
563             rid += chr(c+ord('a')-10)
564         n = n // 36
565     return rid
566
567 def get_config_once(svc):
568     if not svc._rootDesc.get('resources').get('configs', False):
569         # Old API server version, no config export endpoint
570         return {}
571     if not hasattr(svc, '_cached_config'):
572         svc._cached_config = svc.configs().get().execute()
573     return svc._cached_config
574
575 def get_vocabulary_once(svc):
576     if not svc._rootDesc.get('resources').get('vocabularies', False):
577         # Old API server version, no vocabulary export endpoint
578         return {}
579     if not hasattr(svc, '_cached_vocabulary'):
580         svc._cached_vocabulary = svc.vocabularies().get().execute()
581     return svc._cached_vocabulary
582
583 def trim_name(collectionname):
584     """
585     trim_name takes a record name (collection name, project name, etc)
586     and trims it to fit the 255 character name limit, with additional
587     space for the timestamp added by ensure_unique_name, by removing
588     excess characters from the middle and inserting an ellipse
589     """
590
591     max_name_len = 254 - 28
592
593     if len(collectionname) > max_name_len:
594         over = len(collectionname) - max_name_len
595         split = int(max_name_len/2)
596         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
597
598     return collectionname