"""
-import subprocess
+import subprocess32 as subprocess
import os
import sys
import re
import stat
import tempfile
import shutil
+import errno
from functools import partial
import arvados
import StringIO
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
detail = logging.getLogger("detail")
detail.setLevel(logging.INFO)
detail_content = sys.stderr
else:
detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
all_jobs = None
+unsatisfiable_job_scancelled = None
def update_script(path, val):
with open(path+"_", "w") as f:
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
+ return 0
+
+def set_queue_unsatisfiable(g):
+ global all_jobs, unsatisfiable_job_scancelled
+ # Simulate a job requesting a 99 core node.
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '99|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
+ update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+ "\ntouch %s" % unsatisfiable_job_scancelled)
return 0
+def job_cancelled(g):
+ global unsatisfiable_job_scancelled
+ cancelled_job = g.group(1)
+ api = arvados.api('v1')
+ # Check that 'scancel' was called
+ if not os.path.isfile(unsatisfiable_job_scancelled):
+ return 1
+ # Check for the log entry
+ log_entry = api.logs().list(
+ filters=[
+ ['object_uuid', '=', cancelled_job],
+ ['event_type', '=', 'stderr'],
+ ]).execute()['items'][0]
+ if not re.match(
+ r"Constraints cannot be satisfied",
+ log_entry['properties']['text']):
+ return 1
+ return 0
def node_paired(g):
global compute_nodes
compute_nodes[g.group(1)] = g.group(3)
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
if v == "ReqNodeNotAvail":
return 0
-def remaining_jobs(g):
- update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
-
- for k,v in all_jobs.items():
- all_jobs[k] = "Running"
-
- set_squeue(g)
-
- return 0
-
-
def node_busy(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
return 0
def node_shutdown(g):
global compute_nodes
- del compute_nodes[g.group(1)]
- return 0
+ if g.group(1) in compute_nodes:
+ del compute_nodes[g.group(1)]
+ return 0
+ else:
+ return 1
+
def jobs_req(g):
global all_jobs
def run_test(name, actions, checks, driver_class, jobs, provider):
code = 0
+ global unsatisfiable_job_scancelled
+ unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+ "scancel_called")
# Delete any stale node records
api = arvados.api('v1')
driver_class=driver_class,
ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
- # Tests must complete in less than 3 minutes.
- timeout = time.time() + 180
+ # Tests must complete in less than 30 seconds.
+ timeout = time.time() + 30
terminated = False
# Now start node manager
# Test main loop:
# - Read line
- # - Apply negative checks (thinks that are not supposed to happen)
+ # - Apply negative checks (things that are not supposed to happen)
# - Check timeout
# - Check if the next action should trigger
# - If all actions are exhausted, terminate with test success
if code != 0:
detail.error("Check failed")
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
if terminated:
detail.error("Exceeded timeout with actions remaining: %s", actions)
code += 1
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
k, v = actions[0]
code += v(g)
if code != 0:
detail.error("Action failed")
- p.terminate()
+ p.kill()
terminated = True
if not actions:
- p.terminate()
+ p.kill()
terminated = True
except KeyboardInterrupt:
p.kill()
code = 1
shutil.rmtree(fake_slurm)
+ shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
if code == 0:
logger.info("%s passed", name)
else:
if isinstance(detail_content, StringIO.StringIO):
- sys.stderr.write(detail_content.getvalue())
+ detail_content.seek(0)
+ chunk = detail_content.read(4096)
+ while chunk:
+ try:
+ sys.stderr.write(chunk)
+ chunk = detail_content.read(4096)
+ except IOError as e:
+ if e.errno == errno.EAGAIN:
+ # try again (probably pipe buffer full)
+ pass
+ else:
+ raise
logger.info("%s failed", name)
return code
# Test lifecycle.
tests = {
+ "test_unsatisfiable_jobs" : (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_queue_unsatisfiable),
+ (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+ r".*Trying to cancel job '(\S+)'": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_single_node_azure": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"azure"),
"test_multiple_nodes": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_hit_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
r".*Sending create_node request.*": partial(expect_count, 5)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_probe_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*sending request", jobs_req),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
r".*Sending create_node request.*": partial(expect_count, 9)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_no_hang_failing_node_create": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Client error: nope", noop),
(r".*Client error: nope", noop),
(r".*Client error: nope", noop),
],
+ # Checks (things that shouldn't happen)
{},
+ # Driver class
"arvnodeman.test.fake_driver.FailingDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_retry_create": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
- (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
+ (r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
+ (r".*Rate limit exceeded - scheduling retry in 1 seconds", noop),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
],
+ # Checks (things that shouldn't happen)
{},
+ # Driver class
"arvnodeman.test.fake_driver.RetryDriver",
- {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
- }, "azure"),
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_single_node_aws": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeAwsDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"ec2"),
"test_single_node_gce": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeGceDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"gce")
}