from builtins import range
import fcntl
+import functools
import hashlib
import httplib2
import os
import subprocess
import errno
import sys
+import warnings
-import arvados
-from arvados.collection import CollectionReader
+import arvados.errors
HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+CR_UNCOMMITTED = 'Uncommitted'
+CR_COMMITTED = 'Committed'
+CR_FINAL = 'Final'
-keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
-signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
-portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
+keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
+signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
+portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
-manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
+manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
+keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
+keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
+def _deprecated(version=None, preferred=None):
+ """Mark a callable as deprecated in the SDK
+
+ This will wrap the callable to emit as a DeprecationWarning
+ and add a deprecation notice to its docstring.
+
+ If the following arguments are given, they'll be included in the
+ notices:
+
+ preferred: str | None
+ : The name of an alternative that users should use instead.
+
+ version: str | None
+ : The version of Arvados when the callable is scheduled to be
+ removed.
+ """
+ if version is None:
+ version = ''
+ else:
+ version = f' and scheduled to be removed in Arvados {version}'
+ if preferred is None:
+ preferred = ''
+ else:
+ preferred = f' Prefer {preferred} instead.'
+ def deprecated_decorator(func):
+ fullname = f'{func.__module__}.{func.__qualname__}'
+ parent, _, name = fullname.rpartition('.')
+ if name == '__init__':
+ fullname = parent
+ warning_msg = f'{fullname} is deprecated{version}.{preferred}'
+ @functools.wraps(func)
+ def deprecated_wrapper(*args, **kwargs):
+ warnings.warn(warning_msg, DeprecationWarning, 2)
+ return func(*args, **kwargs)
+ # Get func's docstring without any trailing newline or empty lines.
+ func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
+ match = re.search(r'\n([ \t]+)\S', func_doc)
+ indent = '' if match is None else match.group(1)
+ warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
+ # Make the deprecation notice the second "paragraph" of the
+ # docstring if possible. Otherwise append it.
+ docstring, count = re.subn(
+ rf'\n[ \t]*\n{indent}',
+ f'{warning_doc}\n\n{indent}',
+ func_doc,
+ count=1,
+ )
+ if not count:
+ docstring = f'{func_doc.lstrip()}{warning_doc}'
+ deprecated_wrapper.__doc__ = docstring
+ return deprecated_wrapper
+ return deprecated_decorator
+
+@_deprecated('3.0')
def clear_tmpdir(path=None):
"""
Ensure the given directory (or TASK_TMPDIR if none given)
exists and is empty.
"""
+ from arvados import current_task
if path is None:
- path = arvados.current_task().tmpdir
+ path = current_task().tmpdir
if os.path.exists(path):
p = subprocess.Popen(['rm', '-rf', path])
stdout, stderr = p.communicate(None)
raise Exception('rm -rf %s: %s' % (path, stderr))
os.mkdir(path)
+@_deprecated('3.0', 'subprocess.run')
def run_command(execargs, **kwargs):
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
(execargs, p.returncode, stderrdata))
return stdoutdata, stderrdata
+@_deprecated('3.0')
def git_checkout(url, version, path):
+ from arvados import current_job
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
if not os.path.exists(path):
run_command(["git", "clone", url, path],
cwd=os.path.dirname(path))
cwd=path)
return path
+@_deprecated('3.0')
def tar_extractor(path, decompress_flag):
return subprocess.Popen(["tar",
"-C", path,
stdin=subprocess.PIPE, stderr=sys.stderr,
shell=False, close_fds=True)
+@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
def tarball_extract(tarball, path):
"""Retrieve a tarball from Keep and extract it to a local
directory. Return the absolute path where the tarball was
tarball -- collection locator
path -- where to extract the tarball: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
os.unlink(os.path.join(path, '.locator'))
for f in CollectionReader(tarball).all_files():
- if re.search('\.(tbz|tar.bz2)$', f.name()):
+ f_name = f.name()
+ if f_name.endswith(('.tbz', '.tar.bz2')):
p = tar_extractor(path, 'j')
- elif re.search('\.(tgz|tar.gz)$', f.name()):
+ elif f_name.endswith(('.tgz', '.tar.gz')):
p = tar_extractor(path, 'z')
- elif re.search('\.tar$', f.name()):
+ elif f_name.endswith('.tar'):
p = tar_extractor(path, '')
else:
raise arvados.errors.AssertionError(
return os.path.join(path, tld_extracts[0])
return path
+@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
def zipball_extract(zipball, path):
"""Retrieve a zip archive from Keep and extract it to a local
directory. Return the absolute path where the archive was
zipball -- collection locator
path -- where to extract the archive: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
os.unlink(os.path.join(path, '.locator'))
for f in CollectionReader(zipball).all_files():
- if not re.search('\.zip$', f.name()):
+ if not f.name().endswith('.zip'):
raise arvados.errors.NotImplementedError(
"zipball_extract cannot handle filename %s" % f.name())
zip_filename = os.path.join(path, os.path.basename(f.name()))
return os.path.join(path, tld_extracts[0])
return path
+@_deprecated('3.0', 'arvados.collection.Collection')
def collection_extract(collection, path, files=[], decompress=True):
"""Retrieve a collection from Keep and extract it to a local
directory. Return the absolute path where the collection was
collection -- collection locator
path -- where to extract: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
if matches:
collection_hash = matches.group(1)
else:
collection_hash = hashlib.md5(collection).hexdigest()
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
lockfile.close()
return path
+@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
def mkdir_dash_p(path):
if not os.path.isdir(path):
try:
else:
raise
+@_deprecated('3.0', 'arvados.collection.Collection')
def stream_extract(stream, path, files=[], decompress=True):
"""Retrieve a stream from Keep and extract it to a local
directory. Return the absolute path where the stream was
stream -- StreamReader object
path -- where to extract: absolute, or relative to job tmp
"""
+ from arvados import current_job
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
lockfile.close()
return path
+@_deprecated('3.0', 'os.walk')
def listdir_recursive(dirname, base=None, max_depth=None):
"""listdir_recursive(dirname, base, max_depth)
good_len = True
return bool(good_len and HEX_RE.match(s))
+@_deprecated('3.0', 'arvados.util.keyset_list_all')
def list_all(fn, num_retries=0, **kwargs):
# Default limit to (effectively) api server's MAX_LIMIT
kwargs.setdefault('limit', sys.maxsize)
offset = c['offset'] + len(c['items'])
return items
+def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
+ pagesize = 1000
+ kwargs["limit"] = pagesize
+ kwargs["count"] = 'none'
+ asc = "asc" if ascending else "desc"
+ kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
+ other_filters = kwargs.get("filters", [])
+
+ try:
+ select = set(kwargs['select'])
+ except KeyError:
+ pass
+ else:
+ select.add(order_key)
+ select.add('uuid')
+ kwargs['select'] = list(select)
+
+ nextpage = []
+ tot = 0
+ expect_full_page = True
+ seen_prevpage = set()
+ seen_thispage = set()
+ lastitem = None
+ prev_page_all_same_order_key = False
+
+ while True:
+ kwargs["filters"] = nextpage+other_filters
+ items = fn(**kwargs).execute(num_retries=num_retries)
+
+ if len(items["items"]) == 0:
+ if prev_page_all_same_order_key:
+ nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
+ prev_page_all_same_order_key = False
+ continue
+ else:
+ return
+
+ seen_prevpage = seen_thispage
+ seen_thispage = set()
+
+ for i in items["items"]:
+ # In cases where there's more than one record with the
+ # same order key, the result could include records we
+ # already saw in the last page. Skip them.
+ if i["uuid"] in seen_prevpage:
+ continue
+ seen_thispage.add(i["uuid"])
+ yield i
+
+ firstitem = items["items"][0]
+ lastitem = items["items"][-1]
+
+ if firstitem[order_key] == lastitem[order_key]:
+ # Got a page where every item has the same order key.
+ # Switch to using uuid for paging.
+ nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
+ prev_page_all_same_order_key = True
+ else:
+ # Start from the last order key seen, but skip the last
+ # known uuid to avoid retrieving the same row twice. If
+ # there are multiple rows with the same order key it is
+ # still likely we'll end up retrieving duplicate rows.
+ # That's handled by tracking the "seen" rows for each page
+ # so they can be skipped if they show up on the next page.
+ nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
+ prev_page_all_same_order_key = False
+
def ca_certs_path(fallback=httplib2.CA_CERTS):
"""Return the path of the best available CA certs source.
it returns the value of `fallback` (httplib2's CA certs by default).
"""
for ca_certs_path in [
+ # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
+ # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
+ os.environ.get('SSL_CERT_FILE'),
# Arvados specific:
'/etc/arvados/ca-certificates.crt',
# Debian:
# Red Hat:
'/etc/pki/tls/certs/ca-bundle.crt',
]:
- if os.path.exists(ca_certs_path):
+ if ca_certs_path and os.path.exists(ca_certs_path):
return ca_certs_path
return fallback
rid += chr(c+ord('a')-10)
n = n // 36
return rid
+
+def get_config_once(svc):
+ if not svc._rootDesc.get('resources').get('configs', False):
+ # Old API server version, no config export endpoint
+ return {}
+ if not hasattr(svc, '_cached_config'):
+ svc._cached_config = svc.configs().get().execute()
+ return svc._cached_config
+
+def get_vocabulary_once(svc):
+ if not svc._rootDesc.get('resources').get('vocabularies', False):
+ # Old API server version, no vocabulary export endpoint
+ return {}
+ if not hasattr(svc, '_cached_vocabulary'):
+ svc._cached_vocabulary = svc.vocabularies().get().execute()
+ return svc._cached_vocabulary
+
+def trim_name(collectionname):
+ """
+ trim_name takes a record name (collection name, project name, etc)
+ and trims it to fit the 255 character name limit, with additional
+ space for the timestamp added by ensure_unique_name, by removing
+ excess characters from the middle and inserting an ellipse
+ """
+
+ max_name_len = 254 - 28
+
+ if len(collectionname) > max_name_len:
+ over = len(collectionname) - max_name_len
+ split = int(max_name_len/2)
+ collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
+
+ return collectionname