_logger = logging.getLogger('arvados.vocabulary')
-def load_vocabulary(api_client=api('v1')):
+def load_vocabulary(api_client=None):
"""Load the Arvados vocabulary from the API.
"""
+ if api_client is None:
+ api_client = api('v1')
return Vocabulary(api_client.vocabulary())
+class VocabularyError(Exception):
+ """Base class for all vocabulary errors.
+ """
+ pass
+
+class VocabularyKeyError(VocabularyError):
+ pass
+
+class VocabularyValueError(VocabularyError):
+ pass
+
class Vocabulary(object):
def __init__(self, voc_definition={}):
self.strict_keys = voc_definition.get('strict_tags', False)
self.key_aliases = {}
- for key_id, val in voc_definition.get('tags', {}).items():
+ for key_id, val in (voc_definition.get('tags') or {}).items():
strict = val.get('strict', False)
key_labels = [l['label'] for l in val.get('labels', [])]
values = {}
- for v_id, v_val in val.get('values', {}).items():
+ for v_id, v_val in (val.get('values') or {}).items():
labels = [l['label'] for l in v_val.get('labels', [])]
values[v_id] = VocabularyValue(v_id, labels)
vk = VocabularyKey(key_id, key_labels, values, strict)
def convert_to_identifiers(self, obj={}):
"""Translate key/value pairs to machine readable identifiers.
"""
- if not isinstance(obj, dict):
- raise ValueError("obj must be a dict")
- r = {}
- for k, v in obj.items():
- k_id, v_id = k, v
- try:
- k_id = self[k].identifier
- try:
- v_id = self[k][v].identifier
- except KeyError:
- pass
- except KeyError:
- pass
- r[k_id] = v_id
- return r
+ return self._convert_to_what(obj, 'identifier')
def convert_to_labels(self, obj={}):
"""Translate key/value pairs to human readable labels.
"""
+ return self._convert_to_what(obj, 'preferred_label')
+
+ def _convert_to_what(self, obj={}, what=None):
if not isinstance(obj, dict):
raise ValueError("obj must be a dict")
+ if what not in ['preferred_label', 'identifier']:
+ raise ValueError("what attr must be 'preferred_label' or 'identifier'")
r = {}
for k, v in obj.items():
- k_lbl, v_lbl = k, v
+ # Key validation & lookup
+ key_found = False
+ if not isinstance(k, str):
+ raise VocabularyKeyError("key '{}' must be a string".format(k))
+ k_what, v_what = k, v
try:
- k_lbl = self[k].preferred_label
+ k_what = getattr(self[k], what)
+ key_found = True
+ except KeyError:
+ if self.strict_keys:
+ raise VocabularyKeyError("key '{}' not found in vocabulary".format(k))
+
+ # Value validation & lookup
+ if isinstance(v, list):
+ v_what = []
+ for x in v:
+ if not isinstance(x, str):
+ raise VocabularyValueError("value '{}' for key '{}' must be a string".format(x, k))
+ try:
+ v_what.append(getattr(self[k][x], what))
+ except KeyError:
+ if self[k].strict:
+ raise VocabularyValueError("value '{}' not found for key '{}'".format(x, k))
+ v_what.append(x)
+ else:
+ if not isinstance(v, str):
+ raise VocabularyValueError("{} value '{}' for key '{}' must be a string".format(type(v).__name__, v, k))
try:
- v_lbl = self[k][v].preferred_label
+ v_what = getattr(self[k][v], what)
except KeyError:
- pass
- except KeyError:
- pass
- r[k_lbl] = v_lbl
+ if key_found and self[k].strict:
+ raise VocabularyValueError("value '{}' not found for key '{}'".format(v, k))
+
+ r[k_what] = v_what
return r
class VocabularyData(object):