21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / python / arvados / commands / _util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import errno
7 import json
8 import logging
9 import os
10 import re
11 import signal
12 import sys
13
14 FILTER_STR_RE = re.compile(r'''
15 ^\(
16 \ *(\w+)
17 \ *(<|<=|=|>=|>)
18 \ *(\w+)
19 \ *\)$
20 ''', re.ASCII | re.VERBOSE)
21
22 def _pos_int(s):
23     num = int(s)
24     if num < 0:
25         raise ValueError("can't accept negative value: %s" % (num,))
26     return num
27
28 retry_opt = argparse.ArgumentParser(add_help=False)
29 retry_opt.add_argument('--retries', type=_pos_int, default=10, help="""
30 Maximum number of times to retry server requests that encounter temporary
31 failures (e.g., server down).  Default 10.""")
32
33 def _ignore_error(error):
34     return None
35
36 def _raise_error(error):
37     raise error
38
39 def make_home_conf_dir(path, mode=None, errors='ignore'):
40     # Make the directory path under the user's home directory, making parent
41     # directories as needed.
42     # If the directory is newly created, and a mode is specified, chmod it
43     # with those permissions.
44     # If there's an error, return None if errors is 'ignore', else raise an
45     # exception.
46     error_handler = _ignore_error if (errors == 'ignore') else _raise_error
47     tilde_path = os.path.join('~', path)
48     abs_path = os.path.expanduser(tilde_path)
49     if abs_path == tilde_path:
50         return error_handler(ValueError("no home directory available"))
51     try:
52         os.makedirs(abs_path)
53     except OSError as error:
54         if error.errno != errno.EEXIST:
55             return error_handler(error)
56     else:
57         if mode is not None:
58             os.chmod(abs_path, mode)
59     return abs_path
60
61 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
62
63 def exit_signal_handler(sigcode, frame):
64     logging.getLogger('arvados').error("Caught signal {}, exiting.".format(sigcode))
65     sys.exit(-sigcode)
66
67 def install_signal_handlers():
68     global orig_signal_handlers
69     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
70                             for sigcode in CAUGHT_SIGNALS}
71
72 def restore_signal_handlers():
73     for sigcode, orig_handler in orig_signal_handlers.items():
74         signal.signal(sigcode, orig_handler)
75
76 def validate_filters(filters):
77     """Validate user-provided filters
78
79     This function validates that a user-defined object represents valid
80     Arvados filters that can be passed to an API client: that it's a list of
81     3-element lists with the field name and operator given as strings. If any
82     of these conditions are not true, it raises a ValueError with details about
83     the problem.
84
85     It returns validated filters. Currently the provided filters are returned
86     unmodified. Future versions of this function may clean up the filters with
87     "obvious" type conversions, so callers SHOULD use the returned value for
88     Arvados API calls.
89     """
90     if not isinstance(filters, list):
91         raise ValueError(f"filters are not a list: {filters!r}")
92     for index, f in enumerate(filters):
93         if isinstance(f, str):
94             match = FILTER_STR_RE.fullmatch(f)
95             if match is None:
96                 raise ValueError(f"filter at index {index} has invalid syntax: {f!r}")
97             s, op, o = match.groups()
98             if s[0].isdigit():
99                 raise ValueError(f"filter at index {index} has invalid syntax: bad field name {s!r}")
100             if o[0].isdigit():
101                 raise ValueError(f"filter at index {index} has invalid syntax: bad field name {o!r}")
102             continue
103         elif not isinstance(f, list):
104             raise ValueError(f"filter at index {index} is not a string or list: {f!r}")
105         try:
106             s, op, o = f
107         except ValueError:
108             raise ValueError(
109                 f"filter at index {index} does not have three items (field name, operator, operand): {f!r}",
110             ) from None
111         if not isinstance(s, str):
112             raise ValueError(f"filter at index {index} field name is not a string: {s!r}")
113         if not isinstance(op, str):
114             raise ValueError(f"filter at index {index} operator is not a string: {op!r}")
115     return filters
116
117
118 class JSONArgument:
119     """Parse a JSON file from a command line argument string or path
120
121     JSONArgument objects can be called with a string and return an arbitrary
122     object. First it will try to decode the string as JSON. If that fails, it
123     will try to open a file at the path named by the string, and decode it as
124     JSON. If that fails, it raises ValueError with more detail.
125
126     This is designed to be used as an argparse argument type.
127     Typical usage looks like:
128
129         parser = argparse.ArgumentParser()
130         parser.add_argument('--object', type=JSONArgument(), ...)
131
132     You can construct JSONArgument with an optional validation function. If
133     given, it is called with the object decoded from user input, and its
134     return value replaces it. It should raise ValueError if there is a problem
135     with the input. (argparse turns ValueError into a useful error message.)
136
137         filters_type = JSONArgument(validate_filters)
138         parser.add_argument('--filters', type=filters_type, ...)
139     """
140     def __init__(self, validator=None):
141         self.validator = validator
142
143     def __call__(self, value):
144         try:
145             retval = json.loads(value)
146         except json.JSONDecodeError:
147             try:
148                 with open(value, 'rb') as json_file:
149                     retval = json.load(json_file)
150             except json.JSONDecodeError as error:
151                 raise ValueError(f"error decoding JSON from file {value!r}: {error}") from None
152             except (FileNotFoundError, ValueError):
153                 raise ValueError(f"not a valid JSON string or file path: {value!r}") from None
154             except OSError as error:
155                 raise ValueError(f"error reading JSON file path {value!r}: {error.strerror}") from None
156         if self.validator is not None:
157             retval = self.validator(retval)
158         return retval