#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
from __future__ import absolute_import, print_function
import ConfigParser
import importlib
import logging
-import ssl
import sys
import arvados
import pykka
from apiclient import errors as apierror
-# IOError is the base class for socket.error and friends.
+from .baseactor import BaseNodeManagerActor
+
+from functools import partial
+from libcloud.common.types import LibcloudError
+from libcloud.common.exceptions import BaseHTTPError
+
+# IOError is the base class for socket.error, ssl.SSLError, and friends.
# It seems like it hits the sweet spot for operations we want to retry:
# it's low-level, but unlikely to catch code bugs.
-NETWORK_ERRORS = (IOError, ssl.SSLError)
+NETWORK_ERRORS = (IOError,)
ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (LibcloudError, BaseHTTPError)
-actor_class = pykka.ThreadingActor
+actor_class = BaseNodeManagerActor
class NodeManagerConfig(ConfigParser.SafeConfigParser):
"""Node Manager Configuration class.
ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
for sec_name, settings in {
'Arvados': {'insecure': 'no',
- 'timeout': '15'},
+ 'timeout': '15',
+ 'jobs_queue': 'yes',
+ 'slurm_queue': 'yes'
+ },
'Daemon': {'min_nodes': '0',
'max_nodes': '1',
'poll_time': '60',
'max_poll_time': '300',
'poll_stale_after': '600',
+ 'max_total_price': '0',
'boot_fail_after': str(sys.maxint),
- 'node_stale_after': str(60 * 60 * 2)},
+ 'node_stale_after': str(60 * 60 * 2),
+ 'watchdog': '600',
+ 'node_mem_scaling': '0.95'},
+ 'Manage': {'address': '127.0.0.1',
+ 'port': '-1',
+ 'ManagementToken': ''},
'Logging': {'file': '/dev/stderr',
- 'level': 'WARNING'},
+ 'level': 'WARNING'}
}.iteritems():
if not self.has_section(sec_name):
self.add_section(sec_name)
if not self.has_option(sec_name, opt_name):
self.set(sec_name, opt_name, value)
- def get_section(self, section, transformer=None):
+ def get_section(self, section, transformers={}, default_transformer=None):
+ transformer_map = {
+ str: self.get,
+ int: self.getint,
+ bool: self.getboolean,
+ float: self.getfloat,
+ }
result = self._dict()
for key, value in self.items(section):
+ transformer = None
+ if transformers.get(key) in transformer_map:
+ transformer = partial(transformer_map[transformers[key]], section)
+ elif default_transformer in transformer_map:
+ transformer = partial(transformer_map[default_transformer], section)
if transformer is not None:
try:
- value = transformer(value)
+ value = transformer(key)
except (TypeError, ValueError):
pass
result[key] = value
http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
ca_certs=certs_file,
disable_ssl_certificate_validation=insecure)
- return arvados.api('v1',
- cache=False, # Don't reuse an existing client.
+ return arvados.api(version='v1',
host=self.get('Arvados', 'host'),
token=self.get('Arvados', 'token'),
insecure=insecure,
def new_cloud_client(self):
module = importlib.import_module('arvnodeman.computenode.driver.' +
self.get('Cloud', 'provider'))
+ driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER
+ if self.has_option('Cloud', 'driver_class'):
+ d = self.get('Cloud', 'driver_class').split('.')
+ mod = '.'.join(d[:-1])
+ cls = d[-1]
+ driver_class = importlib.import_module(mod).__dict__[cls]
auth_kwargs = self.get_section('Cloud Credentials')
if 'timeout' in auth_kwargs:
auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
return module.ComputeNodeDriver(auth_kwargs,
self.get_section('Cloud List'),
- self.get_section('Cloud Create'))
+ self.get_section('Cloud Create'),
+ driver_class=driver_class)
+
+ def node_sizes(self):
+ """Finds all acceptable NodeSizes for our installation.
- def node_sizes(self, all_sizes):
+ Returns a list of (NodeSize, kwargs) pairs for each NodeSize object
+ returned by libcloud that matches a size listed in our config file.
+ """
+ all_sizes = self.new_cloud_client().list_sizes()
size_kwargs = {}
+ section_types = {
+ 'instance_type': str,
+ 'price': float,
+ 'preemptible': bool,
+ }
for sec_name in self.sections():
sec_words = sec_name.split(None, 2)
if sec_words[0] != 'Size':
continue
- size_kwargs[sec_words[1]] = self.get_section(sec_name, int)
- return [(size, size_kwargs[size.id]) for size in all_sizes
- if size.id in size_kwargs]
+ size_spec = self.get_section(sec_name, section_types, int)
+ if 'preemptible' not in size_spec:
+ size_spec['preemptible'] = False
+ if 'instance_type' not in size_spec:
+ # Assume instance type is Size name if missing
+ size_spec['instance_type'] = sec_words[1]
+ size_spec['id'] = sec_words[1]
+ size_kwargs[sec_words[1]] = size_spec
+ # EC2 node sizes are identified by id. GCE sizes are identified by name.
+ matching_sizes = []
+ for size in all_sizes:
+ matching_sizes += [
+ (size, size_kwargs[s]) for s in size_kwargs
+ if size_kwargs[s]['instance_type'] == size.id
+ or size_kwargs[s]['instance_type'] == size.name
+ ]
+ return matching_sizes
def shutdown_windows(self):
- return [int(n)
+ return [float(n)
for n in self.get('Cloud', 'shutdown_windows').split(',')]