18600: Merge branch 'main'
[arvados.git] / sdk / python / arvados / vocabulary.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6
7 from . import api
8
9 _logger = logging.getLogger('arvados.vocabulary')
10
11 def load_vocabulary(api_client=None):
12     """Load the Arvados vocabulary from the API.
13     """
14     if api_client is None:
15         api_client = api('v1')
16     return Vocabulary(api_client.vocabulary())
17
18 class VocabularyError(Exception):
19     """Base class for all vocabulary errors.
20     """
21     pass
22
23 class VocabularyKeyError(VocabularyError):
24     pass
25
26 class VocabularyValueError(VocabularyError):
27     pass
28
29 class Vocabulary(object):
30     def __init__(self, voc_definition={}):
31         self.strict_keys = voc_definition.get('strict_tags', False)
32         self.key_aliases = {}
33
34         for key_id, val in (voc_definition.get('tags') or {}).items():
35             strict = val.get('strict', False)
36             key_labels = [l['label'] for l in val.get('labels', [])]
37             values = {}
38             for v_id, v_val in (val.get('values') or {}).items():
39                 labels = [l['label'] for l in v_val.get('labels', [])]
40                 values[v_id] = VocabularyValue(v_id, labels)
41             vk = VocabularyKey(key_id, key_labels, values, strict)
42             self.key_aliases[key_id.lower()] = vk
43             for alias in vk.aliases:
44                 self.key_aliases[alias.lower()] = vk
45
46     def __getitem__(self, key):
47         return self.key_aliases[key.lower()]
48
49     def convert_to_identifiers(self, obj={}):
50         """Translate key/value pairs to machine readable identifiers.
51         """
52         return self._convert_to_what(obj, 'identifier')
53
54     def convert_to_labels(self, obj={}):
55         """Translate key/value pairs to human readable labels.
56         """
57         return self._convert_to_what(obj, 'preferred_label')
58
59     def _convert_to_what(self, obj={}, what=None):
60         if not isinstance(obj, dict):
61             raise ValueError("obj must be a dict")
62         if what not in ['preferred_label', 'identifier']:
63             raise ValueError("what attr must be 'preferred_label' or 'identifier'")
64         r = {}
65         for k, v in obj.items():
66             # Key validation & lookup
67             key_found = False
68             if not isinstance(k, str):
69                 raise VocabularyKeyError("key '{}' must be a string".format(k))
70             k_what, v_what = k, v
71             try:
72                 k_what = getattr(self[k], what)
73                 key_found = True
74             except KeyError:
75                 if self.strict_keys:
76                     raise VocabularyKeyError("key '{}' not found in vocabulary".format(k))
77
78             # Value validation & lookup
79             if isinstance(v, list):
80                 v_what = []
81                 for x in v:
82                     if not isinstance(x, str):
83                         raise VocabularyValueError("value '{}' for key '{}' must be a string".format(x, k))
84                     try:
85                         v_what.append(getattr(self[k][x], what))
86                     except KeyError:
87                         if self[k].strict:
88                             raise VocabularyValueError("value '{}' not found for key '{}'".format(x, k))
89                         v_what.append(x)
90             else:
91                 if not isinstance(v, str):
92                     raise VocabularyValueError("{} value '{}' for key '{}' must be a string".format(type(v).__name__, v, k))
93                 try:
94                     v_what = getattr(self[k][v], what)
95                 except KeyError:
96                     if key_found and self[k].strict:
97                         raise VocabularyValueError("value '{}' not found for key '{}'".format(v, k))
98
99             r[k_what] = v_what
100         return r
101
102 class VocabularyData(object):
103     def __init__(self, identifier, aliases=[]):
104         self.identifier = identifier
105         self.aliases = aliases
106
107     def __getattribute__(self, name):
108         if name == 'preferred_label':
109             return self.aliases[0]
110         return super(VocabularyData, self).__getattribute__(name)
111
112 class VocabularyValue(VocabularyData):
113     def __init__(self, identifier, aliases=[]):
114         super(VocabularyValue, self).__init__(identifier, aliases)
115
116 class VocabularyKey(VocabularyData):
117     def __init__(self, identifier, aliases=[], values={}, strict=False):
118         super(VocabularyKey, self).__init__(identifier, aliases)
119         self.strict = strict
120         self.value_aliases = {}
121         for v_id, v_val in values.items():
122             self.value_aliases[v_id.lower()] = v_val
123             for v_alias in v_val.aliases:
124                 self.value_aliases[v_alias.lower()] = v_val
125
126     def __getitem__(self, key):
127         return self.value_aliases[key.lower()]