Merge branch 'master' into 7478-s-preemptable-preemptible
[arvados.git] / services / nodemanager / arvnodeman / config.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import ConfigParser
9 import importlib
10 import logging
11 import sys
12
13 import arvados
14 import httplib2
15 import pykka
16 from apiclient import errors as apierror
17
18 from .baseactor import BaseNodeManagerActor
19
20 from functools import partial
21 from libcloud.common.types import LibcloudError
22 from libcloud.common.exceptions import BaseHTTPError
23
24 # IOError is the base class for socket.error, ssl.SSLError, and friends.
25 # It seems like it hits the sweet spot for operations we want to retry:
26 # it's low-level, but unlikely to catch code bugs.
27 NETWORK_ERRORS = (IOError,)
28 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
29 CLOUD_ERRORS = NETWORK_ERRORS + (LibcloudError, BaseHTTPError)
30
31 actor_class = BaseNodeManagerActor
32
33 class NodeManagerConfig(ConfigParser.SafeConfigParser):
34     """Node Manager Configuration class.
35
36     This a standard Python ConfigParser, with additional helper methods to
37     create objects instantiated with configuration information.
38     """
39
40     LOGGING_NONLEVELS = frozenset(['file'])
41
42     def __init__(self, *args, **kwargs):
43         # Can't use super() because SafeConfigParser is an old-style class.
44         ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
45         for sec_name, settings in {
46             'Arvados': {'insecure': 'no',
47                         'timeout': '15',
48                         'jobs_queue': 'yes',
49                         'slurm_queue': 'yes'
50                     },
51             'Daemon': {'min_nodes': '0',
52                        'max_nodes': '1',
53                        'poll_time': '60',
54                        'max_poll_time': '300',
55                        'poll_stale_after': '600',
56                        'max_total_price': '0',
57                        'boot_fail_after': str(sys.maxint),
58                        'node_stale_after': str(60 * 60 * 2),
59                        'watchdog': '600',
60                        'node_mem_scaling': '0.95'},
61             'Manage': {'address': '127.0.0.1',
62                        'port': '-1',
63                        'ManagementToken': ''},
64             'Logging': {'file': '/dev/stderr',
65                         'level': 'WARNING'}
66         }.iteritems():
67             if not self.has_section(sec_name):
68                 self.add_section(sec_name)
69             for opt_name, value in settings.iteritems():
70                 if not self.has_option(sec_name, opt_name):
71                     self.set(sec_name, opt_name, value)
72
73     def get_section(self, section, transformers={}, default_transformer=None):
74         transformer_map = {
75             str: self.get,
76             int: self.getint,
77             bool: self.getboolean,
78             float: self.getfloat,
79         }
80         result = self._dict()
81         for key, value in self.items(section):
82             transformer = None
83             if transformers.get(key) in transformer_map:
84                 transformer = partial(transformer_map[transformers[key]], section)
85             elif default_transformer in transformer_map:
86                 transformer = partial(transformer_map[default_transformer], section)
87             if transformer is not None:
88                 try:
89                     value = transformer(key)
90                 except (TypeError, ValueError):
91                     pass
92             result[key] = value
93         return result
94
95     def log_levels(self):
96         return {key: getattr(logging, self.get('Logging', key).upper())
97                 for key in self.options('Logging')
98                 if key not in self.LOGGING_NONLEVELS}
99
100     def dispatch_classes(self):
101         mod_name = 'arvnodeman.computenode.dispatch'
102         if self.has_option('Daemon', 'dispatcher'):
103             mod_name = '{}.{}'.format(mod_name,
104                                       self.get('Daemon', 'dispatcher'))
105         module = importlib.import_module(mod_name)
106         return (module.ComputeNodeSetupActor,
107                 module.ComputeNodeShutdownActor,
108                 module.ComputeNodeUpdateActor,
109                 module.ComputeNodeMonitorActor)
110
111     def new_arvados_client(self):
112         if self.has_option('Daemon', 'certs_file'):
113             certs_file = self.get('Daemon', 'certs_file')
114         else:
115             certs_file = None
116         insecure = self.getboolean('Arvados', 'insecure')
117         http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
118                              ca_certs=certs_file,
119                              disable_ssl_certificate_validation=insecure)
120         return arvados.api(version='v1',
121                            host=self.get('Arvados', 'host'),
122                            token=self.get('Arvados', 'token'),
123                            insecure=insecure,
124                            http=http)
125
126     def new_cloud_client(self):
127         module = importlib.import_module('arvnodeman.computenode.driver.' +
128                                          self.get('Cloud', 'provider'))
129         driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER
130         if self.has_option('Cloud', 'driver_class'):
131             d = self.get('Cloud', 'driver_class').split('.')
132             mod = '.'.join(d[:-1])
133             cls = d[-1]
134             driver_class = importlib.import_module(mod).__dict__[cls]
135         auth_kwargs = self.get_section('Cloud Credentials')
136         if 'timeout' in auth_kwargs:
137             auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
138         return module.ComputeNodeDriver(auth_kwargs,
139                                         self.get_section('Cloud List'),
140                                         self.get_section('Cloud Create'),
141                                         driver_class=driver_class)
142
143     def node_sizes(self):
144         """Finds all acceptable NodeSizes for our installation.
145
146         Returns a list of (NodeSize, kwargs) pairs for each NodeSize object
147         returned by libcloud that matches a size listed in our config file.
148         """
149         all_sizes = self.new_cloud_client().list_sizes()
150         size_kwargs = {}
151         section_types = {
152             'instance_type': str,
153             'price': float,
154             'preemptible': bool,
155         }
156         for sec_name in self.sections():
157             sec_words = sec_name.split(None, 2)
158             if sec_words[0] != 'Size':
159                 continue
160             size_spec = self.get_section(sec_name, section_types, int)
161             if 'preemptible' not in size_spec:
162                 size_spec['preemptible'] = False
163             if 'instance_type' not in size_spec:
164                 # Assume instance type is Size name if missing
165                 size_spec['instance_type'] = sec_words[1]
166             size_spec['id'] = sec_words[1]
167             size_kwargs[sec_words[1]] = size_spec
168         # EC2 node sizes are identified by id. GCE sizes are identified by name.
169         matching_sizes = []
170         for size in all_sizes:
171             matching_sizes += [
172                 (size, size_kwargs[s]) for s in size_kwargs
173                 if size_kwargs[s]['instance_type'] == size.id
174                 or size_kwargs[s]['instance_type'] == size.name
175             ]
176         return matching_sizes
177
178     def shutdown_windows(self):
179         return [float(n)
180                 for n in self.get('Cloud', 'shutdown_windows').split(',')]