#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+"""Integration test framework for node manager.
+
+Runs full node manager with an API server (needs ARVADOS_API_HOST and
+ARVADOS_API_TOKEN). Stubs out the cloud driver and slurm commands to mock
+specific behaviors. Monitors the log output to verify an expected sequence of
+events or behaviors for each test.
+
+"""
+
import subprocess
import os
import sys
import stat
import tempfile
import shutil
+import errno
from functools import partial
import arvados
-
-logging.basicConfig(level=logging.INFO)
+import StringIO
+
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
+logger = logging.getLogger("logger")
+logger.setLevel(logging.INFO)
+logger.addHandler(handler)
+
+detail = logging.getLogger("detail")
+detail.setLevel(logging.INFO)
+if os.environ.get("ANMTEST_LOGLEVEL"):
+ detail_content = sys.stderr
+else:
+ detail_content = StringIO.StringIO()
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
all_jobs = None
+unsatisfiable_job_scancelled = None
def update_script(path, val):
with open(path+"_", "w") as f:
f.write(val)
os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
os.rename(path+"_", path)
- logging.info("Update script %s: %s", path, val)
+ detail.info("Update script %s: %s", path, val)
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
+ return 0
+
+def set_queue_unsatisfiable(g):
+ global all_jobs, unsatisfiable_job_scancelled
+ # Simulate a job requesting a 99 core node.
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '99|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
+ update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+ "\ntouch %s" % unsatisfiable_job_scancelled)
return 0
+def job_cancelled(g):
+ global unsatisfiable_job_scancelled
+ cancelled_job = g.group(1)
+ api = arvados.api('v1')
+ # Check that 'scancel' was called
+ if not os.path.isfile(unsatisfiable_job_scancelled):
+ return 1
+ # Check for the log entry
+ log_entry = api.logs().list(
+ filters=[
+ ['object_uuid', '=', cancelled_job],
+ ['event_type', '=', 'stderr'],
+ ]).execute()['items'][0]
+ if not re.match(
+ r"Constraints cannot be satisfied",
+ log_entry['properties']['text']):
+ return 1
+ return 0
def node_paired(g):
global compute_nodes
compute_nodes[g.group(1)] = g.group(3)
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
if v == "ReqNodeNotAvail":
return 0
-def remaining_jobs(g):
- update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
-
- for k,v in all_jobs.items():
- all_jobs[k] = "Running"
-
- set_squeue(g)
-
- return 0
-
-
def node_busy(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
return 0
def node_shutdown(g):
global compute_nodes
- del compute_nodes[g.group(1)]
+ if g.group(1) in compute_nodes:
+ del compute_nodes[g.group(1)]
return 0
def jobs_req(g):
checks[pattern] = partial(expect_count, count-1)
return 0
-def run_test(name, actions, checks, driver_class, jobs):
+def run_test(name, actions, checks, driver_class, jobs, provider):
code = 0
+ global unsatisfiable_job_scancelled
+ unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+ "scancel_called")
+ # Delete any stale node records
api = arvados.api('v1')
for n in api.nodes().list().execute()['items']:
api.nodes().delete(uuid=n["uuid"]).execute()
- logging.info("Start %s", name)
+ logger.info("Start %s", name)
global fake_slurm
fake_slurm = tempfile.mkdtemp()
- logging.info("fake_slurm is %s", fake_slurm)
+ detail.info("fake_slurm is %s", fake_slurm)
global compute_nodes
compute_nodes = {}
env = os.environ.copy()
env["PATH"] = fake_slurm + ":" + env["PATH"]
+ # Reset fake squeue/sinfo to empty
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n")
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n")
- with open("tests/fake.cfg.template") as f:
- with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
- pass
+ # Write configuration file for test
+ with open("tests/fake_%s.cfg.template" % provider) as f:
+ open(os.path.join(fake_slurm, "id_rsa.pub"), "w").close()
with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
token=os.environ["ARVADOS_API_TOKEN"],
driver_class=driver_class,
ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
- timeout = time.time() + 180
+ # Tests must complete in less than 30 seconds.
+ timeout = time.time() + 30
terminated = False
+ # Now start node manager
p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
bufsize=0, stderr=subprocess.PIPE, env=env)
+ # Test main loop:
+ # - Read line
+ # - Apply negative checks (things that are not supposed to happen)
+ # - Check timeout
+ # - Check if the next action should trigger
+ # - If all actions are exhausted, terminate with test success
+ # - If it hits timeout with actions remaining, terminate with test failed
try:
# naive line iteration over pipes gets buffered, which isn't what we want,
# see https://bugs.python.org/issue3907
for line in iter(p.stderr.readline, ""):
- sys.stdout.write(line)
+ detail_content.write(line)
for k,v in checks.items():
g = re.match(k, line)
if g:
- logging.info("Matched check %s", k)
+ detail.info("Matched check %s", k)
code += v(checks, k, g)
if code != 0:
- logging.error("Check failed")
+ detail.error("Check failed")
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
if terminated:
continue
if time.time() > timeout:
- logging.error("Exceeded timeout with actions remaining: %s", actions)
+ detail.error("Exceeded timeout with actions remaining: %s", actions)
code += 1
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
k, v = actions[0]
g = re.match(k, line)
if g:
- logging.info("Matched action %s", k)
+ detail.info("Matched action %s", k)
actions.pop(0)
code += v(g)
if code != 0:
- logging.error("Action failed")
- p.terminate()
+ detail.error("Action failed")
+ p.kill()
terminated = True
if not actions:
- p.terminate()
+ p.kill()
terminated = True
except KeyboardInterrupt:
p.kill()
if actions:
- logging.error("Ended with remaining actions: %s", actions)
+ detail.error("Ended with remaining actions: %s", actions)
code = 1
shutil.rmtree(fake_slurm)
+ shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
if code == 0:
- logging.info("%s passed", name)
+ logger.info("%s passed", name)
else:
- logging.info("%s failed", name)
+ if isinstance(detail_content, StringIO.StringIO):
+ detail_content.seek(0)
+ chunk = detail_content.read(4096)
+ while chunk:
+ try:
+ sys.stderr.write(chunk)
+ chunk = detail_content.read(4096)
+ except IOError as e:
+ if e.errno == errno.EAGAIN:
+ # try again (probably pipe buffer full)
+ pass
+ else:
+ raise
+ logger.info("%s failed", name)
return code
# Test lifecycle.
tests = {
- "test1": (
+ "test_unsatisfiable_jobs" : (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_queue_unsatisfiable),
+ (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+ r".*Trying to cancel job '(\S+)'": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_single_node_azure": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
- {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"}),
- "test2": (
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_multiple_nodes": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }),
- "test3": (
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_hit_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
- (r".*setting node quota to 3", noop),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
r".*Sending create_node request.*": partial(expect_count, 5)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }),
- "test4": (
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_probe_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
- (r".*setting node quota to 3", noop),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*sending request", jobs_req),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
r".*Sending create_node request.*": partial(expect_count, 9)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_no_hang_failing_node_create": (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ ],
+ # Checks (things that shouldn't happen)
+ {},
+ # Driver class
+ "arvnodeman.test.fake_driver.FailingDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- })
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_retry_create": (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
+ (r".*Rate limit exceeded - scheduling retry in 1 seconds", noop),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
+ ],
+ # Checks (things that shouldn't happen)
+ {},
+ # Driver class
+ "arvnodeman.test.fake_driver.RetryDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_single_node_aws": (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeAwsDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "ec2"),
+ "test_single_node_gce": (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeGceDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "gce")
}
code = 0
code += run_test(t, *tests[t])
if code == 0:
- logging.info("Tests passed")
+ logger.info("Tests passed")
else:
- logging.info("Tests failed")
+ logger.info("Tests failed")
exit(code)