WORKSPACE=path Path to the Arvados source tree to build packages from
CWLTOOL=path (optional) Path to cwltool git repository.
+SALAD=path (optional) Path to schema_salad git repository.
EOF
(cd sdk/cwl && python setup.py sdist)
runner=$(cd sdk/cwl/dist && ls -t arvados-cwl-runner-*.tar.gz | head -n1)
+rm -rf sdk/cwl/salad_dist
+mkdir -p sdk/cwl/salad_dist
+if [[ -n "$SALAD" ]] ; then
+ (cd "$SALAD" && python setup.py sdist)
+ salad=$(cd "$SALAD/dist" && ls -t schema-salad-*.tar.gz | head -n1)
+ cp "$SALAD/dist/$salad" $WORKSPACE/sdk/cwl/salad_dist
+fi
+
rm -rf sdk/cwl/cwltool_dist
mkdir -p sdk/cwl/cwltool_dist
if [[ -n "$CWLTOOL" ]] ; then
gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/cwl)
fi
-docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg cwltool=$cwltool -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$gittag "$WORKSPACE/sdk"
+docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$gittag "$WORKSPACE/sdk"
echo arv-keepdocker arvados/jobs $gittag
arv-keepdocker arvados/jobs $gittag
self.check_features(v)
elif isinstance(obj, list):
for i,v in enumerate(obj):
- with SourceLine(obj, i, UnsupportedRequirement):
+ with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v)
def make_output_collection(self, name, tagsString, outputObj):
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "listing", "contents"):
+ for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
if k in fileobj:
del fileobj[k]
if dockerRequirement["dockerImageId"] in cached_lookups:
return dockerRequirement["dockerImageId"]
- with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
+ with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
sp = dockerRequirement["dockerImageId"].split(":")
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else "latest"
if self.on_error:
self.job_order["arv:on_error"] = self.on_error
+ if kwargs.get("debug"):
+ self.job_order["arv:debug"] = True
+
return {
"script": "cwl-runner",
"script_version": "master",
kwargs["work_api"] = self.work_api
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
- with SourceLine(self.tool, None, WorkflowException):
+ with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
if "id" not in self.tool:
raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
def keepmount(obj):
remove_redundant_fields(obj)
- with SourceLine(obj, None, WorkflowException):
+ with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
if "location" not in obj:
raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
- with SourceLine(obj, "location", WorkflowException):
+ with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
if obj["location"].startswith("keep:"):
obj["location"] = "/keep/" + obj["location"][5:]
if "listing" in obj:
output_tags = None
enable_reuse = True
on_error = "continue"
+ debug = False
+
if "arv:output_name" in job_order_object:
output_name = job_order_object["arv:output_name"]
del job_order_object["arv:output_name"]
on_error = job_order_object["arv:on_error"]
del job_order_object["arv:on_error"]
+ if "arv:debug" in job_order_object:
+ debug = job_order_object["arv:debug"]
+ del job_order_object["arv:debug"]
+
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
output_name=output_name, output_tags=output_tags)
fs_access=make_fs_access(""),
num_retries=runner.num_retries))
+ if debug:
+ logger.setLevel(logging.DEBUG)
+ logging.getLogger('arvados').setLevel(logging.DEBUG)
+ logging.getLogger("cwltool").setLevel(logging.DEBUG)
+
args = argparse.Namespace()
args.project_uuid = arvados.current_job()["owner_uuid"]
args.enable_reuse = enable_reuse
args.on_error = on_error
args.submit = False
- args.debug = False
+ args.debug = debug
args.quiet = False
args.ignore_docker_for_reuse = False
args.basedir = os.getcwd()
if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+ debug = logger.isEnabledFor(logging.DEBUG)
+
if src not in self._pathmap:
if src.startswith("file:"):
# Local FS ref, may need to be uploaded or may be on keep
fnPattern="keep:%s/%s",
dirPattern="keep:%s/%s",
raiseOSError=True)
- with SourceLine(srcobj, "location", WorkflowException):
+ with SourceLine(srcobj, "location", WorkflowException, debug):
if isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
- with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+ with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
for l in srcobj.get("secondaryFiles", []):
self.visit(l, uploadfiles)
- with SourceLine(srcobj, "listing", WorkflowException):
+ with SourceLine(srcobj, "listing", WorkflowException, debug):
for l in srcobj.get("listing", []):
self.visit(l, uploadfiles)
- def addentry(self, obj, c, path, subdirs):
+ def addentry(self, obj, c, path, remap):
if obj["location"] in self._pathmap:
src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
if srcpath == "":
srcpath = "."
c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
for l in obj.get("secondaryFiles", []):
- self.addentry(l, c, path, subdirs)
+ self.addentry(l, c, path, remap)
elif obj["class"] == "Directory":
for l in obj.get("listing", []):
- self.addentry(l, c, path + "/" + obj["basename"], subdirs)
- subdirs.append((obj["location"], path + "/" + obj["basename"]))
+ self.addentry(l, c, path + "/" + obj["basename"], remap)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
elif obj["location"].startswith("_:") and "contents" in obj:
with c.open(path + "/" + obj["basename"], "w") as f:
f.write(obj["contents"].encode("utf-8"))
self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
for srcobj in referenced_files:
- subdirs = []
+ remap = []
if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
for l in srcobj.get("listing", []):
- self.addentry(l, c, ".", subdirs)
+ self.addentry(l, c, ".", remap)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
if not check["items"]:
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries )
- self.addentry(srcobj, c, ".", subdirs)
+ self.addentry(srcobj, c, ".", remap)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
if not check["items"]:
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
- if subdirs:
- for loc, sub in subdirs:
- # subdirs will all start with "./", strip it off
- ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ if remap:
+ for loc, sub in remap:
+ # subdirs start with "./", strip it off
+ if sub.startswith("./"):
+ ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ else:
+ ab = self.file_pattern % (c.portable_data_hash(), sub)
self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
ab, "Directory", True)
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170828135420',
- 'schema-salad==2.6.20170712194300',
+ 'cwltool==1.0.20170928192020',
+ 'schema-salad==2.6.20170927145003',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170526013812',
--- /dev/null
+cwlVersion: v1.0
+class: ExpressionTool
+requirements:
+ InlineJavascriptRequirement: {}
+inputs:
+ dir: Directory
+outputs:
+ out: Directory[]
+expression: |
+ ${
+ var samples = {};
+ var pattern = /^(.+)(_S[0-9]{1,3}_)(.+)$/;
+ for (var i = 0; i < inputs.dir.listing.length; i++) {
+ var file = inputs.dir.listing[i];
+ var groups = file.basename.match(pattern);
+ if (groups) {
+ var sampleid = groups[1];
+ if (!samples[sampleid]) {
+ samples[sampleid] = [];
+ }
+ samples[sampleid].push(file);
+ }
+ }
+ var dirs = [];
+ Object.keys(samples).sort().forEach(function(sampleid, _) {
+ dirs.push({"class": "Directory",
+ "basename": sampleid,
+ "listing": samples[sampleid]});
+ });
+ return {"out": dirs};
+ }
\ No newline at end of file
--- /dev/null
+dir:
+ class: Directory
+ location: samples
\ No newline at end of file
--- /dev/null
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+ InlineJavascriptRequirement: {}
+inputs:
+ fastqsdir: Directory
+outputs:
+ out: stdout
+baseCommand: [zcat]
+stdout: $(inputs.fastqsdir.listing[0].nameroot).txt
+arguments:
+ - $(inputs.fastqsdir.listing[0].path)
+ - $(inputs.fastqsdir.listing[1].path)
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+requirements:
+ ScatterFeatureRequirement: {}
+inputs:
+ dir: Directory
+outputs:
+ out:
+ type: File[]
+ outputSource: tool/out
+steps:
+ ex:
+ in:
+ dir: dir
+ out: [out]
+ run: 12213-keepref-expr.cwl
+ tool:
+ in:
+ fastqsdir: ex/out
+ out: [out]
+ scatter: fastqsdir
+ run: 12213-keepref-tool.cwl
\ No newline at end of file
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
arv-put --portable-data-hash testdir
fi
-exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh $@
+exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
output: {}
tool: noreuse.cwl
doc: "Test arv:ReuseRequirement"
+
+- job: 12213-keepref-job.yml
+ output: {
+ "out": [
+ {
+ "checksum": "sha1$1c78028c0d69163391eef89316b44a57bde3fead",
+ "location": "sample1_S01_R1_001.fastq.txt",
+ "class": "File",
+ "size": 32
+ },
+ {
+ "checksum": "sha1$83483b9c65d99967aecc794c14f9f4743314d186",
+ "location": "sample2_S01_R3_001.fastq.txt",
+ "class": "File",
+ "size": 32
+ }
+ ]
+ }
+ tool: 12213-keepref-wf.cwl
+ doc: "Test manipulating keep references with expression tools"
+++ /dev/null
-#!/bin/sh
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-exec arvados-cwl-runner --disable-reuse --compute-checksum "$@"
ARG sdk
ARG runner
+ARG salad
ARG cwltool
ADD python/dist/$sdk /tmp/
+ADD cwl/salad_dist/$salad /tmp/
ADD cwl/cwltool_dist/$cwltool /tmp/
ADD cwl/dist/$runner /tmp/
RUN cd /tmp/arvados-python-client-* && python setup.py install
+RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && python setup.py install ; fi
RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && python setup.py install ; fi
RUN cd /tmp/arvados-cwl-runner-* && python setup.py install
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
type Block struct {
}
func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
+ if err != nil {
+ return err
+ }
+
+ targetPath, targetInfo := path, info
+ if info.Mode()&os.ModeSymlink != 0 {
+ // Update targetpath/info to reflect the symlink
+ // target, not the symlink itself
+ targetPath, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ return err
+ }
+ targetInfo, err = os.Stat(targetPath)
+ if err != nil {
+ return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+ }
+ }
+
+ if targetInfo.Mode()&os.ModeType != 0 {
+ // Skip directories, pipes, other non-regular files
return nil
}
"crypto/md5"
"errors"
"fmt"
- . "gopkg.in/check.v1"
"io/ioutil"
"os"
+ "syscall"
+
+ . "gopkg.in/check.v1"
)
type UploadTestSuite struct{}
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
}
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreeFiles(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+ for _, err := range []error{
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+ os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+ syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+ } {
+ c.Assert(err, IsNil)
+ }
str, err := WriteTree(KeepTestClient{}, tmpdir)
c.Check(err, IsNil)
- c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+ c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
}
func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
networkMode string // passed through to HostConfig.NetworkMode
}
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
// Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
signal.Notify(runner.SigChan, syscall.SIGINT)
go func(sig chan os.Signal) {
<-sig
runner.stop()
- signal.Stop(sig)
}(runner.SigChan)
}
}
}
+func (runner *ContainerRunner) teardown() {
+ if runner.SigChan != nil {
+ signal.Stop(runner.SigChan)
+ close(runner.SigChan)
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
// a new one in case we needed to log anything while
// finalizing.
runner.CrunchLog.Close()
+
+ runner.teardown()
}()
err = runner.fetchContainerRecord()
}
// setup signal handling
- runner.SetupSignals()
+ runner.setupSignals()
// check for and/or load image
err = runner.LoadImage()
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
t.finish = exitCode
- t.stop = make(chan bool)
+ t.stop = make(chan bool, 1)
t.cwd = "/"
return t
}
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// Block is a data block in a manifest stream
// WalkFunc walks a directory tree, uploads each file found and adds it to the
// CollectionWriter.
func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+
+ targetPath, targetInfo := path, info
+ if info.Mode()&os.ModeSymlink != 0 {
+ // Update targetpath/info to reflect the symlink
+ // target, not the symlink itself
+ targetPath, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ return err
+ }
+ targetInfo, err = os.Stat(targetPath)
+ if err != nil {
+ return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+ }
+ }
- if info.IsDir() {
+ if targetInfo.Mode()&os.ModeType != 0 {
+ // Skip directories, pipes, other non-regular files
return nil
}
package main
import (
- . "gopkg.in/check.v1"
"io/ioutil"
"log"
"os"
"sync"
+ "syscall"
+
+ . "gopkg.in/check.v1"
)
type UploadTestSuite struct{}
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
}
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+ for _, err := range []error{
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+ os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+ syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+ } {
+ c.Assert(err, IsNil)
+ }
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
- c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+ c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
}
func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
def find_stale_node(self, stale_time):
- for record in self.nodes.itervalues():
+ # Try to select a stale node record that have an assigned slot first
+ for record in sorted(self.nodes.itervalues(),
+ key=lambda r: r.arvados_node['slot_number'],
+ reverse=True):
node = record.arvados_node
if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
stale_time) and
except pykka.ActorDeadError:
return
cloud_node_id = cloud_node.id
- record = self.cloud_nodes[cloud_node_id]
- shutdown_actor.stop()
+
+ try:
+ shutdown_actor.stop()
+ except pykka.ActorDeadError:
+ pass
+
+ try:
+ record = self.cloud_nodes[cloud_node_id]
+ except KeyError:
+ # Cloud node was already removed from the cloud node list
+ # supposedly while the destroy_node call was finishing its
+ # job.
+ return
record.shutdown_actor = None
if not success:
self.arv_factory = mock.MagicMock(name='arvados_mock')
api_client = mock.MagicMock(name='api_client')
- api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
- testutil.arvados_node_mock(2)]
+ api_client.nodes().create().execute.side_effect = \
+ [testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2)]
self.arv_factory.return_value = api_client
self.cloud_factory = mock.MagicMock(name='cloud_mock')
want_sizes=[testutil.MockSize(1)])
self.busywait(lambda: not self.node_setup.start.called)
+ def test_select_stale_node_records_with_slot_numbers_first(self):
+ """
+ Stale node records with slot_number assigned can exist when
+ clean_arvados_node() isn't executed after a node shutdown, for
+ various reasons.
+ NodeManagerDaemonActor should use these stale node records first, so
+ that they don't accumulate unused, reducing the slots available.
+ """
+ size = testutil.MockSize(1)
+ a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
+ arvados_nodes = []
+ for n in range(9):
+ # Add several stale node records without slot_number assigned
+ arvados_nodes.append(
+ testutil.arvados_node_mock(
+ n+1,
+ slot_number=None,
+ modified_at=a_long_time_ago))
+ # Add one record with stale_node assigned, it should be the
+ # first one selected
+ arv_node = testutil.arvados_node_mock(
+ 123,
+ modified_at=a_long_time_ago)
+ arvados_nodes.append(arv_node)
+ cloud_node = testutil.cloud_node_mock(125, size=size)
+ self.make_daemon(cloud_nodes=[cloud_node],
+ arvados_nodes=arvados_nodes)
+ arvados_nodes_tracker = self.daemon.arvados_nodes.get()
+ # Here, find_stale_node() should return the node record with
+ # the slot_number assigned.
+ self.assertEqual(arv_node,
+ arvados_nodes_tracker.find_stale_node(3601))
+
def test_dont_count_missing_as_busy(self):
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
self.assertTrue(self.node_setup.start.called,
"second node not started after booted node stopped")
+ def test_node_disappearing_during_shutdown(self):
+ cloud_node = testutil.cloud_node_mock(6)
+ setup = self.start_node_boot(cloud_node, id_num=6)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ monitor = self.monitor_list()[0].proxy()
+ self.daemon.update_server_wishlist([])
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertShutdownCancellable(True)
+ shutdown = self.node_shutdown.start().proxy()
+ shutdown.cloud_node.get.return_value = cloud_node
+ # Simulate a successful but slow node destroy call: the cloud node
+ # list gets updated before the ShutdownActor finishes.
+ record = self.daemon.cloud_nodes.get().nodes.values()[0]
+ self.assertTrue(record.shutdown_actor is not None)
+ self.daemon.cloud_nodes.get().nodes.clear()
+ self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+ self.assertTrue(
+ record.shutdown_actor is not None,
+ "test was ineffective -- failed to simulate the race condition")
+
def test_booted_node_shut_down_when_never_listed(self):
setup = self.start_node_boot()
self.cloud_factory().node_start_time.return_value = time.time() - 3601