X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1440352641349c15cc98fea5bf69e0a8b40d7a0c..58a026e09bda4c1e2374347615c325007c64fac4:/sdk/python/arvados/util.py diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py index 9286795e5d..66da2d12af 100644 --- a/sdk/python/arvados/util.py +++ b/sdk/python/arvados/util.py @@ -1,16 +1,44 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import division +from builtins import range + import fcntl import hashlib +import httplib2 import os +import random import re import subprocess +import errno +import sys + +import arvados +from arvados.collection import CollectionReader + +HEX_RE = re.compile(r'^[0-9a-fA-F]+$') + +keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*') +signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*') +portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+') +uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}') +collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}') +group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}') +user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}') +link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}') +job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}') +container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}') +manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE) def clear_tmpdir(path=None): """ Ensure the given directory (or TASK_TMPDIR if none given) exists and is empty. """ - if path == None: - path = current_task().tmpdir + if path is None: + path = arvados.current_task().tmpdir if os.path.exists(path): p = subprocess.Popen(['rm', '-rf', path]) stdout, stderr = p.communicate(None) @@ -27,19 +55,19 @@ def run_command(execargs, **kwargs): p = subprocess.Popen(execargs, **kwargs) stdoutdata, stderrdata = p.communicate(None) if p.returncode != 0: - raise errors.CommandFailedError( + raise arvados.errors.CommandFailedError( "run_command %s exit %d:\n%s" % (execargs, p.returncode, stderrdata)) return stdoutdata, stderrdata def git_checkout(url, version, path): if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) + path = os.path.join(arvados.current_job().tmpdir, path) if not os.path.exists(path): - util.run_command(["git", "clone", url, path], - cwd=os.path.dirname(path)) - util.run_command(["git", "checkout", version], - cwd=path) + run_command(["git", "clone", url, path], + cwd=os.path.dirname(path)) + run_command(["git", "checkout", version], + cwd=path) return path def tar_extractor(path, decompress_flag): @@ -62,7 +90,7 @@ def tarball_extract(tarball, path): path -- where to extract the tarball: absolute, or relative to job tmp """ if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) + path = os.path.join(arvados.current_job().tmpdir, path) lockfile = open(path + '.lock', 'w') fcntl.flock(lockfile, fcntl.LOCK_EX) try: @@ -86,13 +114,13 @@ def tarball_extract(tarball, path): for f in CollectionReader(tarball).all_files(): if re.search('\.(tbz|tar.bz2)$', f.name()): - p = util.tar_extractor(path, 'j') + p = tar_extractor(path, 'j') elif re.search('\.(tgz|tar.gz)$', f.name()): - p = util.tar_extractor(path, 'z') + p = tar_extractor(path, 'z') elif re.search('\.tar$', f.name()): - p = util.tar_extractor(path, '') + p = tar_extractor(path, '') else: - raise errors.AssertionError( + raise arvados.errors.AssertionError( "tarball_extract cannot handle filename %s" % f.name()) while True: buf = f.read(2**20) @@ -103,10 +131,10 @@ def tarball_extract(tarball, path): p.wait() if p.returncode != 0: lockfile.close() - raise errors.CommandFailedError( + raise arvados.errors.CommandFailedError( "tar exited %d" % p.returncode) os.symlink(tarball, os.path.join(path, '.locator')) - tld_extracts = filter(lambda f: f != '.locator', os.listdir(path)) + tld_extracts = [f for f in os.listdir(path) if f != '.locator'] lockfile.close() if len(tld_extracts) == 1: return os.path.join(path, tld_extracts[0]) @@ -123,7 +151,7 @@ def zipball_extract(zipball, path): path -- where to extract the archive: absolute, or relative to job tmp """ if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) + path = os.path.join(arvados.current_job().tmpdir, path) lockfile = open(path + '.lock', 'w') fcntl.flock(lockfile, fcntl.LOCK_EX) try: @@ -147,7 +175,7 @@ def zipball_extract(zipball, path): for f in CollectionReader(zipball).all_files(): if not re.search('\.zip$', f.name()): - raise errors.NotImplementedError( + raise arvados.errors.NotImplementedError( "zipball_extract cannot handle filename %s" % f.name()) zip_filename = os.path.join(path, os.path.basename(f.name())) zip_file = open(zip_filename, 'wb') @@ -157,7 +185,7 @@ def zipball_extract(zipball, path): break zip_file.write(buf) zip_file.close() - + p = subprocess.Popen(["unzip", "-q", "-o", "-d", path, @@ -168,11 +196,11 @@ def zipball_extract(zipball, path): p.wait() if p.returncode != 0: lockfile.close() - raise errors.CommandFailedError( + raise arvados.errors.CommandFailedError( "unzip exited %d" % p.returncode) os.unlink(zip_filename) os.symlink(zipball, os.path.join(path, '.locator')) - tld_extracts = filter(lambda f: f != '.locator', os.listdir(path)) + tld_extracts = [f for f in os.listdir(path) if f != '.locator'] lockfile.close() if len(tld_extracts) == 1: return os.path.join(path, tld_extracts[0]) @@ -192,7 +220,7 @@ def collection_extract(collection, path, files=[], decompress=True): else: collection_hash = hashlib.md5(collection).hexdigest() if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) + path = os.path.join(arvados.current_job().tmpdir, path) lockfile = open(path + '.lock', 'w') fcntl.flock(lockfile, fcntl.LOCK_EX) try: @@ -225,14 +253,14 @@ def collection_extract(collection, path, files=[], decompress=True): files_got += [outname] if os.path.exists(os.path.join(path, stream_name, outname)): continue - util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) + mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) outfile = open(os.path.join(path, stream_name, outname), 'wb') for buf in (f.readall_decompressed() if decompress else f.readall()): outfile.write(buf) outfile.close() if len(files_got) < len(files): - raise errors.AssertionError( + raise arvados.errors.AssertionError( "Wanted files %s but only got %s from %s" % (files, files_got, [z.name() for z in CollectionReader(collection).all_files()])) @@ -242,13 +270,16 @@ def collection_extract(collection, path, files=[], decompress=True): return path def mkdir_dash_p(path): - if not os.path.exists(path): - util.mkdir_dash_p(os.path.dirname(path)) + if not os.path.isdir(path): try: - os.mkdir(path) - except OSError: - if not os.path.exists(path): - os.mkdir(path) + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST and os.path.isdir(path): + # It is not an error if someone else creates the + # directory between our exists() and makedirs() calls. + pass + else: + raise def stream_extract(stream, path, files=[], decompress=True): """Retrieve a stream from Keep and extract it to a local @@ -259,7 +290,7 @@ def stream_extract(stream, path, files=[], decompress=True): path -- where to extract: absolute, or relative to job tmp """ if not re.search('^/', path): - path = os.path.join(current_job().tmpdir, path) + path = os.path.join(arvados.current_job().tmpdir, path) lockfile = open(path + '.lock', 'w') fcntl.flock(lockfile, fcntl.LOCK_EX) try: @@ -277,26 +308,111 @@ def stream_extract(stream, path, files=[], decompress=True): files_got += [outname] if os.path.exists(os.path.join(path, outname)): os.unlink(os.path.join(path, outname)) - util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) + mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) outfile = open(os.path.join(path, outname), 'wb') for buf in (f.readall_decompressed() if decompress else f.readall()): outfile.write(buf) outfile.close() if len(files_got) < len(files): - raise errors.AssertionError( + raise arvados.errors.AssertionError( "Wanted files %s but only got %s from %s" % (files, files_got, [z.name() for z in stream.all_files()])) lockfile.close() return path -def listdir_recursive(dirname, base=None): +def listdir_recursive(dirname, base=None, max_depth=None): + """listdir_recursive(dirname, base, max_depth) + + Return a list of file and directory names found under dirname. + + If base is not None, prepend "{base}/" to each returned name. + + If max_depth is None, descend into directories and return only the + names of files found in the directory tree. + + If max_depth is a non-negative integer, stop descending into + directories at the given depth, and at that point return directory + names instead. + + If max_depth==0 (and base is None) this is equivalent to + sorted(os.listdir(dirname)). + """ allfiles = [] for ent in sorted(os.listdir(dirname)): ent_path = os.path.join(dirname, ent) ent_base = os.path.join(base, ent) if base else ent - if os.path.isdir(ent_path): - allfiles += util.listdir_recursive(ent_path, ent_base) + if os.path.isdir(ent_path) and max_depth != 0: + allfiles += listdir_recursive( + ent_path, base=ent_base, + max_depth=(max_depth-1 if max_depth else None)) else: allfiles += [ent_base] return allfiles + +def is_hex(s, *length_args): + """is_hex(s[, length[, max_length]]) -> boolean + + Return True if s is a string of hexadecimal digits. + If one length argument is given, the string must contain exactly + that number of digits. + If two length arguments are given, the string must contain a number of + digits between those two lengths, inclusive. + Return False otherwise. + """ + num_length_args = len(length_args) + if num_length_args > 2: + raise arvados.errors.ArgumentError( + "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args)) + elif num_length_args == 2: + good_len = (length_args[0] <= len(s) <= length_args[1]) + elif num_length_args == 1: + good_len = (len(s) == length_args[0]) + else: + good_len = True + return bool(good_len and HEX_RE.match(s)) + +def list_all(fn, num_retries=0, **kwargs): + # Default limit to (effectively) api server's MAX_LIMIT + kwargs.setdefault('limit', sys.maxsize) + items = [] + offset = 0 + items_available = sys.maxsize + while len(items) < items_available: + c = fn(offset=offset, **kwargs).execute(num_retries=num_retries) + items += c['items'] + items_available = c['items_available'] + offset = c['offset'] + len(c['items']) + return items + +def ca_certs_path(fallback=httplib2.CA_CERTS): + """Return the path of the best available CA certs source. + + This function searches for various distribution sources of CA + certificates, and returns the first it finds. If it doesn't find any, + it returns the value of `fallback` (httplib2's CA certs by default). + """ + for ca_certs_path in [ + # Arvados specific: + '/etc/arvados/ca-certificates.crt', + # Debian: + '/etc/ssl/certs/ca-certificates.crt', + # Red Hat: + '/etc/pki/tls/certs/ca-bundle.crt', + ]: + if os.path.exists(ca_certs_path): + return ca_certs_path + return fallback + +def new_request_id(): + rid = "req-" + # 2**104 > 36**20 > 2**103 + n = random.getrandbits(104) + for _ in range(20): + c = n % 36 + if c < 10: + rid += chr(c+ord('0')) + else: + rid += chr(c+ord('a')-10) + n = n // 36 + return rid