1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
14 FILTER_STR_RE = re.compile(r'''
20 ''', re.ASCII | re.VERBOSE)
25 raise ValueError("can't accept negative value: %s" % (num,))
28 retry_opt = argparse.ArgumentParser(add_help=False)
29 retry_opt.add_argument('--retries', type=_pos_int, default=10, help="""
30 Maximum number of times to retry server requests that encounter temporary
31 failures (e.g., server down). Default 10.""")
33 def _ignore_error(error):
36 def _raise_error(error):
39 def make_home_conf_dir(path, mode=None, errors='ignore'):
40 # Make the directory path under the user's home directory, making parent
41 # directories as needed.
42 # If the directory is newly created, and a mode is specified, chmod it
43 # with those permissions.
44 # If there's an error, return None if errors is 'ignore', else raise an
46 error_handler = _ignore_error if (errors == 'ignore') else _raise_error
47 tilde_path = os.path.join('~', path)
48 abs_path = os.path.expanduser(tilde_path)
49 if abs_path == tilde_path:
50 return error_handler(ValueError("no home directory available"))
53 except OSError as error:
54 if error.errno != errno.EEXIST:
55 return error_handler(error)
58 os.chmod(abs_path, mode)
61 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
63 def exit_signal_handler(sigcode, frame):
64 logging.getLogger('arvados').error("Caught signal {}, exiting.".format(sigcode))
67 def install_signal_handlers():
68 global orig_signal_handlers
69 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
70 for sigcode in CAUGHT_SIGNALS}
72 def restore_signal_handlers():
73 for sigcode, orig_handler in orig_signal_handlers.items():
74 signal.signal(sigcode, orig_handler)
76 def validate_filters(filters):
77 """Validate user-provided filters
79 This function validates that a user-defined object represents valid
80 Arvados filters that can be passed to an API client: that it's a list of
81 3-element lists with the field name and operator given as strings. If any
82 of these conditions are not true, it raises a ValueError with details about
85 It returns validated filters. Currently the provided filters are returned
86 unmodified. Future versions of this function may clean up the filters with
87 "obvious" type conversions, so callers SHOULD use the returned value for
90 if not isinstance(filters, list):
91 raise ValueError(f"filters are not a list: {filters!r}")
92 for index, f in enumerate(filters):
93 if isinstance(f, str):
94 match = FILTER_STR_RE.fullmatch(f)
96 raise ValueError(f"filter at index {index} has invalid syntax: {f!r}")
97 s, op, o = match.groups()
99 raise ValueError(f"filter at index {index} has invalid syntax: bad field name {s!r}")
101 raise ValueError(f"filter at index {index} has invalid syntax: bad field name {o!r}")
103 elif not isinstance(f, list):
104 raise ValueError(f"filter at index {index} is not a string or list: {f!r}")
109 f"filter at index {index} does not have three items (field name, operator, operand): {f!r}",
111 if not isinstance(s, str):
112 raise ValueError(f"filter at index {index} field name is not a string: {s!r}")
113 if not isinstance(op, str):
114 raise ValueError(f"filter at index {index} operator is not a string: {op!r}")
119 """Parse a JSON file from a command line argument string or path
121 JSONArgument objects can be called with a string and return an arbitrary
122 object. First it will try to decode the string as JSON. If that fails, it
123 will try to open a file at the path named by the string, and decode it as
124 JSON. If that fails, it raises ValueError with more detail.
126 This is designed to be used as an argparse argument type.
127 Typical usage looks like:
129 parser = argparse.ArgumentParser()
130 parser.add_argument('--object', type=JSONArgument(), ...)
132 You can construct JSONArgument with an optional validation function. If
133 given, it is called with the object decoded from user input, and its
134 return value replaces it. It should raise ValueError if there is a problem
135 with the input. (argparse turns ValueError into a useful error message.)
137 filters_type = JSONArgument(validate_filters)
138 parser.add_argument('--filters', type=filters_type, ...)
140 def __init__(self, validator=None):
141 self.validator = validator
143 def __call__(self, value):
145 retval = json.loads(value)
146 except json.JSONDecodeError:
148 with open(value, 'rb') as json_file:
149 retval = json.load(json_file)
150 except json.JSONDecodeError as error:
151 raise ValueError(f"error decoding JSON from file {value!r}: {error}") from None
152 except (FileNotFoundError, ValueError):
153 raise ValueError(f"not a valid JSON string or file path: {value!r}") from None
154 except OSError as error:
155 raise ValueError(f"error reading JSON file path {value!r}: {error.strerror}") from None
156 if self.validator is not None:
157 retval = self.validator(retval)