#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+"""Integration test framework for node manager.
+
+Runs full node manager with an API server (needs ARVADOS_API_HOST and
+ARVADOS_API_TOKEN). Stubs out the cloud driver and slurm commands to mock
+specific behaviors. Monitors the log output to verify an expected sequence of
+events or behaviors for each test.
+
+"""
+
import subprocess
import os
import sys
import stat
import tempfile
import shutil
+from functools import partial
+import arvados
+import StringIO
+
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
-logging.basicConfig(level=logging.INFO)
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
+logger = logging.getLogger("logger")
+logger.setLevel(logging.INFO)
+logger.addHandler(handler)
+
+detail = logging.getLogger("detail")
+detail.setLevel(logging.INFO)
+if os.environ.get("ANMTEST_LOGLEVEL"):
+ detail_content = sys.stderr
+else:
+ detail_content = StringIO.StringIO()
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
+all_jobs = None
+unsatisfiable_job_scancelled = None
def update_script(path, val):
with open(path+"_", "w") as f:
f.write(val)
os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
os.rename(path+"_", path)
+ detail.info("Update script %s: %s", path, val)
-
-def set_squeue(actions, checks, k, g):
- update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
-echo '1|100|100|ReqNodeNotAvail|34t0i-dz642-h42bg3hq4bdfpf9'
-""")
+def set_squeue(g):
+ global all_jobs
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '1|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
return 0
-def set_sinfo_alloc(actions, checks, k, g):
- update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
-echo '%s alloc'
-""" % (g.group(3)))
+def set_queue_unsatisfiable(g):
+ global all_jobs, unsatisfiable_job_scancelled
+ # Simulate a job requesting a 99 core node.
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '99|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
+ update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+ "\ntouch %s" % unsatisfiable_job_scancelled)
+ return 0
- update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
-echo '1|100|100|Running|34t0i-dz642-h42bg3hq4bdfpf9'
-""")
+def job_cancelled(g):
+ global unsatisfiable_job_scancelled
+ cancelled_job = g.group(1)
+ api = arvados.api('v1')
+ # Check that 'scancel' was called
+ if not os.path.isfile(unsatisfiable_job_scancelled):
+ return 1
+ # Check for the log entry
+ log_entry = api.logs().list(
+ filters=[
+ ['object_uuid', '=', cancelled_job],
+ ['event_type', '=', 'stderr'],
+ ]).execute()['items'][0]
+ if not re.match(
+ r"Constraints cannot be satisfied",
+ log_entry['properties']['text']):
+ return 1
+ return 0
+def node_paired(g):
global compute_nodes
compute_nodes[g.group(1)] = g.group(3)
+
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
+
+ for k,v in all_jobs.items():
+ if v == "ReqNodeNotAvail":
+ all_jobs[k] = "Running"
+ break
+
+ set_squeue(g)
+
+ return 0
+
+def remaining_jobs(g):
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
+
+ for k,v in all_jobs.items():
+ all_jobs[k] = "Running"
+
+ set_squeue(g)
+
return 0
-def set_sinfo_idle(actions, checks, k, g):
- update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
-echo '%s idle'
-""" % (compute_nodes[g.group(1)]))
+
+def node_busy(g):
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
return 0
-def noop(actions, checks, k, g):
+def node_shutdown(g):
+ global compute_nodes
+ del compute_nodes[g.group(1)]
return 0
-def down_fail(actions, checks, k, g):
+def jobs_req(g):
+ global all_jobs
+ for k,v in all_jobs.items():
+ all_jobs[k] = "ReqNodeNotAvail"
+ set_squeue(g)
+ return 0
+
+def noop(g):
+ return 0
+
+def fail(checks, pattern, g):
return 1
+def expect_count(count, checks, pattern, g):
+ if count == 0:
+ return 1
+ else:
+ checks[pattern] = partial(expect_count, count-1)
+ return 0
-def run_test(actions, checks, driver_class):
+def run_test(name, actions, checks, driver_class, jobs, provider):
code = 0
+ global unsatisfiable_job_scancelled
+ unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+ "scancel_called")
+
+ # Delete any stale node records
+ api = arvados.api('v1')
+ for n in api.nodes().list().execute()['items']:
+ api.nodes().delete(uuid=n["uuid"]).execute()
+
+ logger.info("Start %s", name)
global fake_slurm
fake_slurm = tempfile.mkdtemp()
- logging.info("fake_slurm is %s", fake_slurm)
+ detail.info("fake_slurm is %s", fake_slurm)
global compute_nodes
compute_nodes = {}
+ global all_jobs
+ all_jobs = jobs
+
env = os.environ.copy()
env["PATH"] = fake_slurm + ":" + env["PATH"]
+ # Reset fake squeue/sinfo to empty
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n")
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n")
- with open("tests/fake.cfg.template") as f:
- with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
- pass
+ # Write configuration file for test
+ with open("tests/fake_%s.cfg.template" % provider) as f:
+ open(os.path.join(fake_slurm, "id_rsa.pub"), "w").close()
with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
token=os.environ["ARVADOS_API_TOKEN"],
driver_class=driver_class,
ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
- timeout = time.time() + 300
+ # Tests must complete in less than 3 minutes.
+ timeout = time.time() + 180
+ terminated = False
+ # Now start node manager
p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
- bufsize=1, stderr=subprocess.PIPE, env=env)
- for line in p.stderr:
- sys.stdout.write(line)
+ bufsize=0, stderr=subprocess.PIPE, env=env)
- if time.time() > timeout:
- logging.error("Exceeded timeout")
- code = 1
- p.terminate()
+ # Test main loop:
+ # - Read line
+ # - Apply negative checks (things that are not supposed to happen)
+ # - Check timeout
+ # - Check if the next action should trigger
+ # - If all actions are exhausted, terminate with test success
+ # - If it hits timeout with actions remaining, terminate with test failed
+ try:
+ # naive line iteration over pipes gets buffered, which isn't what we want,
+ # see https://bugs.python.org/issue3907
+ for line in iter(p.stderr.readline, ""):
+ detail_content.write(line)
- for k,v in actions.items():
- g = re.match(k, line)
- if g:
- logging.info("Triggered action %s", k)
- del actions[k]
- code = v(actions, checks, k, g)
- if code != 0:
- logging.error("Action failed")
+ for k,v in checks.items():
+ g = re.match(k, line)
+ if g:
+ detail.info("Matched check %s", k)
+ code += v(checks, k, g)
+ if code != 0:
+ detail.error("Check failed")
+ if not terminated:
+ p.terminate()
+ terminated = True
+
+ if terminated:
+ continue
+
+ if time.time() > timeout:
+ detail.error("Exceeded timeout with actions remaining: %s", actions)
+ code += 1
+ if not terminated:
p.terminate()
+ terminated = True
- for k,v in checks.items():
+ k, v = actions[0]
g = re.match(k, line)
if g:
- logging.info("Triggered check %s", k)
- code = v(actions, checks, k, g)
+ detail.info("Matched action %s", k)
+ actions.pop(0)
+ code += v(g)
if code != 0:
- logging.error("Check failed")
+ detail.error("Action failed")
p.terminate()
+ terminated = True
+
+ if not actions:
+ p.terminate()
+ terminated = True
+ except KeyboardInterrupt:
+ p.kill()
- if not actions:
- p.terminate()
+ if actions:
+ detail.error("Ended with remaining actions: %s", actions)
+ code = 1
- #shutil.rmtree(fake_slurm)
+ shutil.rmtree(fake_slurm)
+ shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
+
+ if code == 0:
+ logger.info("%s passed", name)
+ else:
+ if isinstance(detail_content, StringIO.StringIO):
+ sys.stderr.write(detail_content.getvalue())
+ logger.info("%s failed", name)
return code
def main():
- code = run_test({
- r".*Daemon started": set_squeue,
- r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": set_sinfo_alloc,
- r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)": set_sinfo_idle,
- r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)": noop,
- r".*Shutdown success": noop,
- }, {
- r".*Suggesting shutdown because node state is \('down', .*\)": down_fail
- },
- "arvnodeman.test.fake_driver.FakeDriver")
+ # Test lifecycle.
+
+ tests = {
+ "test_unsatisfiable_jobs" : (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_queue_unsatisfiable),
+ (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+ r".*Trying to cancel job '(\S+)'": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
+ "test_single_node_azure": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ "azure"),
+ "test_multiple_nodes": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }, "azure"),
+ "test_hit_quota": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
+ r".*Sending create_node request.*": partial(expect_count, 5)
+ },
+ "arvnodeman.test.fake_driver.QuotaDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }, "azure"),
+ "test_probe_quota": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*sending request", jobs_req),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
+ r".*Sending create_node request.*": partial(expect_count, 9)
+ },
+ "arvnodeman.test.fake_driver.QuotaDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }, "azure"),
+ "test_no_hang_failing_node_create": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ (r".*Client error: nope", noop),
+ ],
+ {},
+ "arvnodeman.test.fake_driver.FailingDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }, "azure"),
+ "test_retry_create": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
+ (r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
+ ],
+ {},
+ "arvnodeman.test.fake_driver.RetryDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
+ }, "azure"),
+ "test_single_node_aws": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeAwsDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ "ec2"),
+ "test_single_node_gce": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeGceDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ "gce")
+ }
+
+ code = 0
+ if len(sys.argv) > 1:
+ code = run_test(sys.argv[1], *tests[sys.argv[1]])
+ else:
+ for t in sorted(tests.keys()):
+ code += run_test(t, *tests[t])
+
+ if code == 0:
+ logger.info("Tests passed")
+ else:
+ logger.info("Tests failed")
+
exit(code)
-main()
+if __name__ == '__main__':
+ main()