20937: Improve error handling
[arvados.git] / sdk / python / arvados / vocabulary.py
index b4148890aa31e547879d940f5aa1ce9ff46bb742..3bb87c48dc8217ffe2f07bc21c362e179af1a656 100644 (file)
@@ -8,21 +8,34 @@ from . import api
 
 _logger = logging.getLogger('arvados.vocabulary')
 
-def load_vocabulary(api_client=api('v1')):
+def load_vocabulary(api_client=None):
     """Load the Arvados vocabulary from the API.
     """
+    if api_client is None:
+        api_client = api('v1')
     return Vocabulary(api_client.vocabulary())
 
+class VocabularyError(Exception):
+    """Base class for all vocabulary errors.
+    """
+    pass
+
+class VocabularyKeyError(VocabularyError):
+    pass
+
+class VocabularyValueError(VocabularyError):
+    pass
+
 class Vocabulary(object):
     def __init__(self, voc_definition={}):
         self.strict_keys = voc_definition.get('strict_tags', False)
         self.key_aliases = {}
 
-        for key_id, val in voc_definition.get('tags', {}).items():
+        for key_id, val in (voc_definition.get('tags') or {}).items():
             strict = val.get('strict', False)
             key_labels = [l['label'] for l in val.get('labels', [])]
             values = {}
-            for v_id, v_val in val.get('values', {}).items():
+            for v_id, v_val in (val.get('values') or {}).items():
                 labels = [l['label'] for l in v_val.get('labels', [])]
                 values[v_id] = VocabularyValue(v_id, labels)
             vk = VocabularyKey(key_id, key_labels, values, strict)
@@ -36,39 +49,54 @@ class Vocabulary(object):
     def convert_to_identifiers(self, obj={}):
         """Translate key/value pairs to machine readable identifiers.
         """
-        if not isinstance(obj, dict):
-            raise ValueError("obj must be a dict")
-        r = {}
-        for k, v in obj.items():
-            k_id, v_id = k, v
-            try:
-                k_id = self[k].identifier
-                try:
-                    v_id = self[k][v].identifier
-                except KeyError:
-                    pass
-            except KeyError:
-                pass
-            r[k_id] = v_id
-        return r
+        return self._convert_to_what(obj, 'identifier')
 
     def convert_to_labels(self, obj={}):
         """Translate key/value pairs to human readable labels.
         """
+        return self._convert_to_what(obj, 'preferred_label')
+
+    def _convert_to_what(self, obj={}, what=None):
         if not isinstance(obj, dict):
             raise ValueError("obj must be a dict")
+        if what not in ['preferred_label', 'identifier']:
+            raise ValueError("what attr must be 'preferred_label' or 'identifier'")
         r = {}
         for k, v in obj.items():
-            k_lbl, v_lbl = k, v
+            # Key validation & lookup
+            key_found = False
+            if not isinstance(k, str):
+                raise VocabularyKeyError("key '{}' must be a string".format(k))
+            k_what, v_what = k, v
             try:
-                k_lbl = self[k].preferred_label
+                k_what = getattr(self[k], what)
+                key_found = True
+            except KeyError:
+                if self.strict_keys:
+                    raise VocabularyKeyError("key '{}' not found in vocabulary".format(k))
+
+            # Value validation & lookup
+            if isinstance(v, list):
+                v_what = []
+                for x in v:
+                    if not isinstance(x, str):
+                        raise VocabularyValueError("value '{}' for key '{}' must be a string".format(x, k))
+                    try:
+                        v_what.append(getattr(self[k][x], what))
+                    except KeyError:
+                        if self[k].strict:
+                            raise VocabularyValueError("value '{}' not found for key '{}'".format(x, k))
+                        v_what.append(x)
+            else:
+                if not isinstance(v, str):
+                    raise VocabularyValueError("{} value '{}' for key '{}' must be a string".format(type(v).__name__, v, k))
                 try:
-                    v_lbl = self[k][v].preferred_label
+                    v_what = getattr(self[k][v], what)
                 except KeyError:
-                    pass
-            except KeyError:
-                pass
-            r[k_lbl] = v_lbl
+                    if key_found and self[k].strict:
+                        raise VocabularyValueError("value '{}' not found for key '{}'".format(v, k))
+
+            r[k_what] = v_what
         return r
 
 class VocabularyData(object):