X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2b288a6027b06e969461ebfe2104249302bb0a72..6daa187dccdc8d7513417d5122fd145661647617:/sdk/python/arvados/vocabulary.py diff --git a/sdk/python/arvados/vocabulary.py b/sdk/python/arvados/vocabulary.py index a3344a17b9..3bb87c48dc 100644 --- a/sdk/python/arvados/vocabulary.py +++ b/sdk/python/arvados/vocabulary.py @@ -15,16 +15,27 @@ def load_vocabulary(api_client=None): api_client = api('v1') return Vocabulary(api_client.vocabulary()) +class VocabularyError(Exception): + """Base class for all vocabulary errors. + """ + pass + +class VocabularyKeyError(VocabularyError): + pass + +class VocabularyValueError(VocabularyError): + pass + class Vocabulary(object): def __init__(self, voc_definition={}): self.strict_keys = voc_definition.get('strict_tags', False) self.key_aliases = {} - for key_id, val in voc_definition.get('tags', {}).items(): + for key_id, val in (voc_definition.get('tags') or {}).items(): strict = val.get('strict', False) key_labels = [l['label'] for l in val.get('labels', [])] values = {} - for v_id, v_val in val.get('values', {}).items(): + for v_id, v_val in (val.get('values') or {}).items(): labels = [l['label'] for l in v_val.get('labels', [])] values[v_id] = VocabularyValue(v_id, labels) vk = VocabularyKey(key_id, key_labels, values, strict) @@ -38,49 +49,54 @@ class Vocabulary(object): def convert_to_identifiers(self, obj={}): """Translate key/value pairs to machine readable identifiers. """ - if not isinstance(obj, dict): - raise ValueError("obj must be a dict") - r = {} - for k, v in obj.items(): - k_id, v_id = k, v - try: - k_id = self[k].identifier - try: - if isinstance(v, list): - v_id = [self[k][x].identifier for x in v] - else: - v_id = self[k][v].identifier - except KeyError: - if self[k].strict: - raise ValueError("value '%s' not found for key '%s'" % (v, k)) - except KeyError: - if self.strict_keys: - raise KeyError("key '%s' not found" % k) - r[k_id] = v_id - return r + return self._convert_to_what(obj, 'identifier') def convert_to_labels(self, obj={}): """Translate key/value pairs to human readable labels. """ + return self._convert_to_what(obj, 'preferred_label') + + def _convert_to_what(self, obj={}, what=None): if not isinstance(obj, dict): raise ValueError("obj must be a dict") + if what not in ['preferred_label', 'identifier']: + raise ValueError("what attr must be 'preferred_label' or 'identifier'") r = {} for k, v in obj.items(): - k_lbl, v_lbl = k, v + # Key validation & lookup + key_found = False + if not isinstance(k, str): + raise VocabularyKeyError("key '{}' must be a string".format(k)) + k_what, v_what = k, v try: - k_lbl = self[k].preferred_label - try: - if isinstance(v, list): - v_lbl = [self[k][x].preferred_label for x in v] - else: - v_lbl = self[k][v].preferred_label - except KeyError: - if self[k].strict: - raise ValueError("value '%s' not found for key '%s'" % (v, k)) + k_what = getattr(self[k], what) + key_found = True except KeyError: if self.strict_keys: - raise KeyError("key '%s' not found" % k) - r[k_lbl] = v_lbl + raise VocabularyKeyError("key '{}' not found in vocabulary".format(k)) + + # Value validation & lookup + if isinstance(v, list): + v_what = [] + for x in v: + if not isinstance(x, str): + raise VocabularyValueError("value '{}' for key '{}' must be a string".format(x, k)) + try: + v_what.append(getattr(self[k][x], what)) + except KeyError: + if self[k].strict: + raise VocabularyValueError("value '{}' not found for key '{}'".format(x, k)) + v_what.append(x) + else: + if not isinstance(v, str): + raise VocabularyValueError("{} value '{}' for key '{}' must be a string".format(type(v).__name__, v, k)) + try: + v_what = getattr(self[k][v], what) + except KeyError: + if key_found and self[k].strict: + raise VocabularyValueError("value '{}' not found for key '{}'".format(v, k)) + + r[k_what] = v_what return r class VocabularyData(object):