"ram": 1024*1024 * self.submit_runner_ram,
"API": True
},
+ "use_existing": self.enable_reuse,
"properties": {}
}
if self.arvrunner.pipeline:
self.arvrunner.pipeline["components"][self.name] = {"job": record}
with Perf(metrics, "update_pipeline_component %s" % self.name):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+ uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
if self.arvrunner.uuid:
try:
job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
if job:
components = job["components"]
components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+ self.arvrunner.api.jobs().update(
+ uuid=self.arvrunner.uuid,
body={
"components": components
}).execute(num_retries=self.arvrunner.num_retries)
del job_spec["owner_uuid"]
job_spec["job"] = job
+
+ instance_spec = {
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": self.name,
+ "components": {
+ "cwl-runner": job_spec,
+ },
+ "state": "RunningOnServer",
+ }
+ if not self.enable_reuse:
+ instance_spec["properties"] = {"run_options": {"enable_job_reuse": False}}
+
self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": self.name,
- "components": {"cwl-runner": job_spec },
- "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+ body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
if kwargs.get("wait") is False:
self.tool = tool
self.job_order = job_order
self.running = False
+ if enable_reuse:
+ # If reuse is permitted by command line arguments but
+ # disabled by the workflow itself, disable it.
+ reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.uuid = None
self.final_output = None
'vcpus': 1,
'ram': 1024*1024*1024
},
- "properties": {}
+ 'use_existing': True,
+ 'properties': {}
}
stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+ expect_pipeline["properties"] = {"run_options": {"enable_job_reuse": False}}
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = [
+ 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--disable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["use_existing"] = False
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
+ @stubs
+ def test_submit_container_reuse_disabled_by_workflow(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf_no_reuse.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = [
+ 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--disable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["use_existing"] = False
+ expect_container["name"] = "submit_wf_no_reuse.cwl"
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+ {
+ "class": "http://arvados.org/cwl#ReuseRequirement",
+ "enableReuse": False,
+ },
+ ]
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
'vcpus': 1,
'ram': 1073741824
},
- "properties": {}
+ 'use_existing': True,
+ 'properties': {}
}
stubs.api.container_requests().create.assert_called_with(
'vcpus': 1,
'ram': 1073741824
},
- "properties": {
+ 'use_existing': True,
+ 'properties': {
"template_uuid": "962eh-7fd4e-gkbzl62qqtfig37"
}
}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+# Test case for arvados-cwl-runner. Disables job/container reuse.
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+inputs:
+ - id: x
+ type: File
+ - id: y
+ type: Directory
+ - id: z
+ type: Directory
+outputs: []
+steps:
+ - id: step1
+ in:
+ - { id: x, source: "#x" }
+ out: []
+ run: ../tool/submit_tool.cwl
+hints:
+ arv:ReuseRequirement:
+ enableReuse: false
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
type Block struct {
}
func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
+ if err != nil {
+ return err
+ }
+
+ targetPath, targetInfo := path, info
+ if info.Mode()&os.ModeSymlink != 0 {
+ // Update targetpath/info to reflect the symlink
+ // target, not the symlink itself
+ targetPath, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ return err
+ }
+ targetInfo, err = os.Stat(targetPath)
+ if err != nil {
+ return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+ }
+ }
+
+ if targetInfo.Mode()&os.ModeType != 0 {
+ // Skip directories, pipes, other non-regular files
return nil
}
"crypto/md5"
"errors"
"fmt"
- . "gopkg.in/check.v1"
"io/ioutil"
"os"
+ "syscall"
+
+ . "gopkg.in/check.v1"
)
type UploadTestSuite struct{}
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
}
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreeFiles(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+ for _, err := range []error{
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+ os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+ syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+ } {
+ c.Assert(err, IsNil)
+ }
str, err := WriteTree(KeepTestClient{}, tmpdir)
c.Check(err, IsNil)
- c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+ c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
}
func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
networkMode string // passed through to HostConfig.NetworkMode
}
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
// Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
signal.Notify(runner.SigChan, syscall.SIGINT)
go func(sig chan os.Signal) {
<-sig
runner.stop()
- signal.Stop(sig)
}(runner.SigChan)
}
}
}
+func (runner *ContainerRunner) teardown() {
+ if runner.SigChan != nil {
+ signal.Stop(runner.SigChan)
+ close(runner.SigChan)
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
// a new one in case we needed to log anything while
// finalizing.
runner.CrunchLog.Close()
+
+ runner.teardown()
}()
err = runner.fetchContainerRecord()
}
// setup signal handling
- runner.SetupSignals()
+ runner.setupSignals()
// check for and/or load image
err = runner.LoadImage()
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
t.finish = exitCode
- t.stop = make(chan bool)
+ t.stop = make(chan bool, 1)
t.cwd = "/"
return t
}
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// Block is a data block in a manifest stream
// WalkFunc walks a directory tree, uploads each file found and adds it to the
// CollectionWriter.
func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+
+ targetPath, targetInfo := path, info
+ if info.Mode()&os.ModeSymlink != 0 {
+ // Update targetpath/info to reflect the symlink
+ // target, not the symlink itself
+ targetPath, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ return err
+ }
+ targetInfo, err = os.Stat(targetPath)
+ if err != nil {
+ return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+ }
+ }
- if info.IsDir() {
+ if targetInfo.Mode()&os.ModeType != 0 {
+ // Skip directories, pipes, other non-regular files
return nil
}
package main
import (
- . "gopkg.in/check.v1"
"io/ioutil"
"log"
"os"
"sync"
+ "syscall"
+
+ . "gopkg.in/check.v1"
)
type UploadTestSuite struct{}
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
}
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+ for _, err := range []error{
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+ os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+ syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+ } {
+ c.Assert(err, IsNil)
+ }
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
- c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+ c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
}
func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
except pykka.ActorDeadError:
return
cloud_node_id = cloud_node.id
- record = self.cloud_nodes[cloud_node_id]
- shutdown_actor.stop()
+
+ try:
+ shutdown_actor.stop()
+ except pykka.ActorDeadError:
+ pass
+
+ try:
+ record = self.cloud_nodes[cloud_node_id]
+ except KeyError:
+ # Cloud node was already removed from the cloud node list
+ # supposedly while the destroy_node call was finishing its
+ # job.
+ return
record.shutdown_actor = None
if not success:
self.arv_factory = mock.MagicMock(name='arvados_mock')
api_client = mock.MagicMock(name='api_client')
- api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
- testutil.arvados_node_mock(2)]
+ api_client.nodes().create().execute.side_effect = \
+ [testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2)]
self.arv_factory.return_value = api_client
self.cloud_factory = mock.MagicMock(name='cloud_mock')
self.assertTrue(self.node_setup.start.called,
"second node not started after booted node stopped")
+ def test_node_disappearing_during_shutdown(self):
+ cloud_node = testutil.cloud_node_mock(6)
+ setup = self.start_node_boot(cloud_node, id_num=6)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ monitor = self.monitor_list()[0].proxy()
+ self.daemon.update_server_wishlist([])
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertShutdownCancellable(True)
+ shutdown = self.node_shutdown.start().proxy()
+ shutdown.cloud_node.get.return_value = cloud_node
+ # Simulate a successful but slow node destroy call: the cloud node
+ # list gets updated before the ShutdownActor finishes.
+ record = self.daemon.cloud_nodes.get().nodes.values()[0]
+ self.assertTrue(record.shutdown_actor is not None)
+ self.daemon.cloud_nodes.get().nodes.clear()
+ self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+ self.assertTrue(
+ record.shutdown_actor is not None,
+ "test was ineffective -- failed to simulate the race condition")
+
def test_booted_node_shut_down_when_never_listed(self):
setup = self.start_node_boot()
self.cloud_factory().node_start_time.return_value = time.time() - 3601