+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import division
+from builtins import range
+
import fcntl
import hashlib
import httplib2
import os
+import random
import re
import subprocess
import errno
from arvados.collection import CollectionReader
HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+CR_UNCOMMITTED = 'Uncommitted'
+CR_COMMITTED = 'Committed'
+CR_FINAL = 'Final'
keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
def clear_tmpdir(path=None):
p = subprocess.Popen(execargs, **kwargs)
stdoutdata, stderrdata = p.communicate(None)
if p.returncode != 0:
- raise errors.CommandFailedError(
+ raise arvados.errors.CommandFailedError(
"run_command %s exit %d:\n%s" %
(execargs, p.returncode, stderrdata))
return stdoutdata, stderrdata
elif re.search('\.tar$', f.name()):
p = tar_extractor(path, '')
else:
- raise errors.AssertionError(
+ raise arvados.errors.AssertionError(
"tarball_extract cannot handle filename %s" % f.name())
while True:
buf = f.read(2**20)
p.wait()
if p.returncode != 0:
lockfile.close()
- raise errors.CommandFailedError(
+ raise arvados.errors.CommandFailedError(
"tar exited %d" % p.returncode)
os.symlink(tarball, os.path.join(path, '.locator'))
- tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+ tld_extracts = [f for f in os.listdir(path) if f != '.locator']
lockfile.close()
if len(tld_extracts) == 1:
return os.path.join(path, tld_extracts[0])
for f in CollectionReader(zipball).all_files():
if not re.search('\.zip$', f.name()):
- raise errors.NotImplementedError(
+ raise arvados.errors.NotImplementedError(
"zipball_extract cannot handle filename %s" % f.name())
zip_filename = os.path.join(path, os.path.basename(f.name()))
zip_file = open(zip_filename, 'wb')
p.wait()
if p.returncode != 0:
lockfile.close()
- raise errors.CommandFailedError(
+ raise arvados.errors.CommandFailedError(
"unzip exited %d" % p.returncode)
os.unlink(zip_filename)
os.symlink(zipball, os.path.join(path, '.locator'))
- tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+ tld_extracts = [f for f in os.listdir(path) if f != '.locator']
lockfile.close()
if len(tld_extracts) == 1:
return os.path.join(path, tld_extracts[0])
outfile.write(buf)
outfile.close()
if len(files_got) < len(files):
- raise errors.AssertionError(
+ raise arvados.errors.AssertionError(
"Wanted files %s but only got %s from %s" %
(files, files_got,
[z.name() for z in CollectionReader(collection).all_files()]))
outfile.write(buf)
outfile.close()
if len(files_got) < len(files):
- raise errors.AssertionError(
+ raise arvados.errors.AssertionError(
"Wanted files %s but only got %s from %s" %
(files, files_got, [z.name() for z in stream.all_files()]))
lockfile.close()
"""
num_length_args = len(length_args)
if num_length_args > 2:
- raise errors.ArgumentError("is_hex accepts up to 3 arguments ({} given)"
- .format(1 + num_length_args))
+ raise arvados.errors.ArgumentError(
+ "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
elif num_length_args == 2:
good_len = (length_args[0] <= len(s) <= length_args[1])
elif num_length_args == 1:
return bool(good_len and HEX_RE.match(s))
def list_all(fn, num_retries=0, **kwargs):
+ # Default limit to (effectively) api server's MAX_LIMIT
+ kwargs.setdefault('limit', sys.maxsize)
items = []
offset = 0
- items_available = sys.maxint
+ items_available = sys.maxsize
while len(items) < items_available:
c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
items += c['items']
offset = c['offset'] + len(c['items'])
return items
+def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
+ pagesize = 1000
+ kwargs["limit"] = pagesize
+ kwargs["count"] = 'none'
+ kwargs["order"] = ["%s %s" % (order_key, "asc" if ascending else "desc"), "uuid asc"]
+ other_filters = kwargs.get("filters", [])
+
+ if "select" in kwargs and "uuid" not in kwargs["select"]:
+ kwargs["select"].append("uuid")
+
+ nextpage = []
+ tot = 0
+ expect_full_page = True
+ seen_prevpage = set()
+ seen_thispage = set()
+ lastitem = None
+ prev_page_all_same_order_key = False
+
+ while True:
+ kwargs["filters"] = nextpage+other_filters
+ items = fn(**kwargs).execute(num_retries=num_retries)
+
+ if len(items["items"]) == 0:
+ if prev_page_all_same_order_key:
+ nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
+ prev_page_all_same_order_key = False
+ continue
+ else:
+ return
+
+ seen_prevpage = seen_thispage
+ seen_thispage = set()
+
+ for i in items["items"]:
+ # In cases where there's more than one record with the
+ # same order key, the result could include records we
+ # already saw in the last page. Skip them.
+ if i["uuid"] in seen_prevpage:
+ continue
+ seen_thispage.add(i["uuid"])
+ yield i
+
+ firstitem = items["items"][0]
+ lastitem = items["items"][-1]
+
+ if firstitem[order_key] == lastitem[order_key]:
+ # Got a page where every item has the same order key.
+ # Switch to using uuid for paging.
+ nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">", lastitem["uuid"]]]
+ prev_page_all_same_order_key = True
+ else:
+ # Start from the last order key seen, but skip the last
+ # known uuid to avoid retrieving the same row twice. If
+ # there are multiple rows with the same order key it is
+ # still likely we'll end up retrieving duplicate rows.
+ # That's handled by tracking the "seen" rows for each page
+ # so they can be skipped if they show up on the next page.
+ nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
+ prev_page_all_same_order_key = False
+
+
def ca_certs_path(fallback=httplib2.CA_CERTS):
"""Return the path of the best available CA certs source.
it returns the value of `fallback` (httplib2's CA certs by default).
"""
for ca_certs_path in [
+ # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
+ # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
+ os.environ.get('SSL_CERT_FILE'),
+ # Arvados specific:
+ '/etc/arvados/ca-certificates.crt',
# Debian:
'/etc/ssl/certs/ca-certificates.crt',
# Red Hat:
'/etc/pki/tls/certs/ca-bundle.crt',
]:
- if os.path.exists(ca_certs_path):
+ if ca_certs_path and os.path.exists(ca_certs_path):
return ca_certs_path
return fallback
+
+def new_request_id():
+ rid = "req-"
+ # 2**104 > 36**20 > 2**103
+ n = random.getrandbits(104)
+ for _ in range(20):
+ c = n % 36
+ if c < 10:
+ rid += chr(c+ord('0'))
+ else:
+ rid += chr(c+ord('a')-10)
+ n = n // 36
+ return rid
+
+def get_config_once(svc):
+ if not svc._rootDesc.get('resources').get('configs', False):
+ # Old API server version, no config export endpoint
+ return {}
+ if not hasattr(svc, '_cached_config'):
+ svc._cached_config = svc.configs().get().execute()
+ return svc._cached_config