net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
nio4r (2.5.8)
- nokogiri (1.13.4)
+ nokogiri (1.13.6)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
npm-rails (0.2.1)
echo -n 'graphviz: '
dot -V || fatal "No graphviz. Try: apt-get install graphviz"
echo -n 'geckodriver: '
- geckodriver --version | grep ^geckodriver || echo "No geckodriver. Try: wget -O- https://github.com/mozilla/geckodriver/releases/download/v0.23.0/geckodriver-v0.23.0-linux64.tar.gz | sudo tar -C /usr/local/bin -xzf - geckodriver"
+ geckodriver --version | grep ^geckodriver || echo "No geckodriver. Try: arvados-server install"
+ echo -n 'singularity: '
+ singularity --version || fatal "No singularity. Try: arvados-server install"
+ echo -n 'docker client: '
+ docker --version || echo "No docker client. Try: arvados-server install"
+ echo -n 'docker server: '
+ docker info --format='{{.ServerVersion}}' || echo "No docker server. Try: arvados-server install"
if [[ "$NEED_SDK_R" = true ]]; then
# R SDK stuff
h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
fmt.Fprint(h, uuid)
authSecret := fmt.Sprintf("%x", h.Sum(nil))
- dcid := "theperthcountyconspiracy"
gw := crunchrun.Gateway{
- DockerContainerID: &dcid,
- ContainerUUID: uuid,
- Address: "0.0.0.0:0",
- AuthSecret: authSecret,
+ ContainerUUID: uuid,
+ Address: "0.0.0.0:0",
+ AuthSecret: authSecret,
// Just forward connections to localhost instead of a
// container, so we can test without running a
// container.
- ContainerIPAddress: func() (string, error) { return "0.0.0.0", nil },
+ Target: crunchrun.GatewayTargetStub{},
}
err := gw.Start()
c.Assert(err, check.IsNil)
cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
- c.Check(cmd.Run(), check.NotNil)
- c.Log(stderr.String())
- c.Check(stderr.String(), check.Matches, `(?ms).*(No such container: theperthcountyconspiracy|exec: \"docker\": executable file not found in \$PATH).*`)
+ c.Check(cmd.Run(), check.IsNil)
+ c.Check(stdout.String(), check.Equals, "ok\n")
// Set up an http server, and try using "arvados-client shell"
// to forward traffic to it.
|runtime_user_uuid|string|The user permission that will be granted to this container.||
|runtime_auth_scopes|array of string|The scopes associated with the auth token used to run this container.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container request|default is ["default"]|
+|output_properties|hash|User metadata properties to set on the output collection. The output collection will also have default properties "type" ("intermediate" or "output") and "container_request" (the uuid of container request that produced the collection).|
h2(#priority). Priority
|gateway_address|string|Address (host:port) of gateway server.|Internal use only.|
|interactive_session_started|boolean|Indicates whether @arvados-client shell@ has been used to run commands in the container, which may have altered the container's behavior and output.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container||
+|output_properties|hash|User metadata properties to set on the output collection.|
h2(#container_states). Container states
property1: value1
property2: $(inputs.value2)
+ arv:OutputCollectionProperties:
+ outputProperties:
+ property1: value1
+ property2: $(inputs.value2)
+
cwltool:CUDARequirement:
cudaVersionMin: "11.0"
cudaComputeCapability: "9.0"
table(table table-bordered table-condensed).
|_. Field |_. Type |_. Description |
-|processProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the container request. May include expressions that reference `$(inputs)` of the current workflow or tool.|
+|processProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the container request. May include expressions that reference @$(inputs)@ of the current workflow or tool.|
+
+h2(#OutputCollectionProperties). arv:OutputCollectionProperties
+
+Specify custom "properties":{{site.baseurl}}/api/methods.html#subpropertyfilters that will be set on the output collection of the workflow step.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|outputProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the output collection. May include expressions that reference @$(inputs)@ of the current workflow or tool.|
h2(#CUDARequirement). cwltool:CUDARequirement
FreezeProjectRequiresDescription: false
# Project properties that must have non-empty values in order to
- # freeze a project. Example: {"property_name": true}
- FreezeProjectRequiresProperties: {}
+ # freeze a project. Example: "property_name": {}
+ FreezeProjectRequiresProperties:
+ SAMPLE: {}
# If true, only an admin user can un-freeze a project. If false,
# any user with "manage" permission can un-freeze.
"API.DisabledAPIs": false,
"API.FreezeProjectRequiresDescription": true,
"API.FreezeProjectRequiresProperties": true,
+ "API.FreezeProjectRequiresProperties.*": true,
"API.KeepServiceRequestTimeout": false,
"API.MaxConcurrentRequests": false,
"API.MaxIndexDatabaseRead": false,
ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
ldr.checkEmptyKeepstores(cc),
ldr.checkUnlistedKeepstores(cc),
+ ldr.checkLocalKeepBlobBuffers(cc),
ldr.checkStorageClasses(cc),
ldr.checkCUDAVersions(cc),
// TODO: check non-empty Rendezvous on
return nil
}
+func (ldr *Loader) checkLocalKeepBlobBuffers(cc arvados.Cluster) error {
+ kbb := cc.Containers.LocalKeepBlobBuffersPerVCPU
+ if kbb == 0 {
+ return nil
+ }
+ for uuid, vol := range cc.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ ldr.Logger.Warnf("LocalKeepBlobBuffersPerVCPU is %d but will not be used because at least one volume (%s) uses AccessViaHosts -- suggest changing to 0", kbb, uuid)
+ return nil
+ }
+ if !vol.ReadOnly && vol.Replication < cc.Collections.DefaultReplication {
+ ldr.Logger.Warnf("LocalKeepBlobBuffersPerVCPU is %d but will not be used because at least one volume (%s) has lower replication than DefaultReplication (%d < %d) -- suggest changing to 0", kbb, uuid, vol.Replication, cc.Collections.DefaultReplication)
+ return nil
+ }
+ }
+ return nil
+}
+
func (ldr *Loader) checkStorageClasses(cc arvados.Cluster) error {
classOnVolume := map[string]bool{}
for volid, vol := range cc.Volumes {
InternalURLs:
"http://host.example:12345": {}
Volumes:
- zzzzz-nyw5e-aaaaaaaaaaaaaaa: {}
+ zzzzz-nyw5e-aaaaaaaaaaaaaaa: {Replication: 2}
`, &logbuf).Load()
c.Assert(err, check.IsNil)
c.Log(logbuf.String())
}
}
+func (s *LoadSuite) TestWarnUnusedLocalKeep(c *check.C) {
+ var logbuf bytes.Buffer
+ _, err := testLoader(c, `
+Clusters:
+ z1111:
+ Volumes:
+ z:
+ Replication: 1
+`, &logbuf).Load()
+ c.Assert(err, check.IsNil)
+ c.Check(logbuf.String(), check.Matches, `(?ms).*LocalKeepBlobBuffersPerVCPU is 1 but will not be used because at least one volume \(z\) has lower replication than DefaultReplication \(1 < 2\) -- suggest changing to 0.*`)
+
+ logbuf.Reset()
+ _, err = testLoader(c, `
+Clusters:
+ z1111:
+ Volumes:
+ z:
+ AccessViaHosts:
+ "http://0.0.0.0:12345": {}
+`, &logbuf).Load()
+ c.Assert(err, check.IsNil)
+ c.Check(logbuf.String(), check.Matches, `(?ms).*LocalKeepBlobBuffersPerVCPU is 1 but will not be used because at least one volume \(z\) uses AccessViaHosts -- suggest changing to 0.*`)
+}
+
func (s *LoadSuite) TestImplicitStorageClasses(c *check.C) {
// If StorageClasses and Volumes.*.StorageClasses are all
// empty, there is a default storage class named "default".
authKey := fmt.Sprintf("%x", h.Sum(nil))
s.gw = &crunchrun.Gateway{
- DockerContainerID: new(string),
- ContainerUUID: s.ctrUUID,
- AuthSecret: authKey,
- Address: "localhost:0",
- Log: ctxlog.TestLogger(c),
- ContainerIPAddress: func() (string, error) { return "localhost", nil },
+ ContainerUUID: s.ctrUUID,
+ AuthSecret: authKey,
+ Address: "localhost:0",
+ Log: ctxlog.TestLogger(c),
+ Target: crunchrun.GatewayTargetStub{},
}
c.Assert(s.gw.Start(), check.IsNil)
rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
"os"
"os/exec"
"sync"
- "sync/atomic"
"syscall"
- "time"
"git.arvados.org/arvados.git/lib/selfsigned"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/creack/pty"
- dockerclient "github.com/docker/docker/client"
"github.com/google/shlex"
"golang.org/x/crypto/ssh"
"golang.org/x/net/context"
)
+type GatewayTarget interface {
+ // Command that will execute cmd inside the container
+ InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error)
+
+ // IP address inside container
+ IPAddress() (string, error)
+}
+
+type GatewayTargetStub struct{}
+
+func (GatewayTargetStub) IPAddress() (string, error) { return "127.0.0.1", nil }
+func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error) {
+ return exec.CommandContext(ctx, cmd[0], cmd[1:]...), nil
+}
+
type Gateway struct {
- DockerContainerID *string
- ContainerUUID string
- Address string // listen host:port; if port=0, Start() will change it to the selected port
- AuthSecret string
- Log interface {
+ ContainerUUID string
+ Address string // listen host:port; if port=0, Start() will change it to the selected port
+ AuthSecret string
+ Target GatewayTarget
+ Log interface {
Printf(fmt string, args ...interface{})
}
- // return local ip address of running container, or "" if not available
- ContainerIPAddress func() (string, error)
sshConfig ssh.ServerConfig
requestAuth string
return
}
- var dstaddr string
- if gw.ContainerIPAddress != nil {
- dstaddr, err = gw.ContainerIPAddress()
- if err != nil {
- fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
- return
- }
- }
- if dstaddr == "" {
+ dstaddr, err := gw.Target.IPAddress()
+ if err != nil {
+ fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
+ return
+ } else if dstaddr == "" {
fmt.Fprintf(ch.Stderr(), "container has no IP address\n")
return
}
execargs = []string{"/bin/bash", "-login"}
}
go func() {
- cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ var resp struct {
+ Status uint32
+ }
+ defer func() {
+ ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+ ch.Close()
+ }()
+
+ cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
+ if err != nil {
+ fmt.Fprintln(ch.Stderr(), err)
+ ch.CloseWrite()
+ resp.Status = 1
+ return
+ }
cmd.Stdin = ch
cmd.Stdout = ch
cmd.Stderr = ch.Stderr()
if tty0 != nil {
- cmd.Args = append(cmd.Args, "-t")
cmd.Stdin = tty0
cmd.Stdout = tty0
cmd.Stderr = tty0
// Send our own debug messages to tty as well.
logw = tty0
}
- cmd.Args = append(cmd.Args, *gw.DockerContainerID)
- cmd.Args = append(cmd.Args, execargs...)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setctty: tty0 != nil,
Setsid: true,
}
cmd.Env = append(os.Environ(), termEnv...)
- err := cmd.Run()
- var resp struct {
- Status uint32
- }
+ err = cmd.Run()
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
resp.Status = uint32(status.ExitStatus())
if resp.Status == 0 && (err != nil || errClose != nil) {
resp.Status = 1
}
- ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
- ch.Close()
}()
case "pty-req":
eol = "\r\n"
}
}
}
-
-func dockerContainerIPAddress(containerID *string) func() (string, error) {
- var saved atomic.Value
- return func() (string, error) {
- if ip, ok := saved.Load().(*string); ok {
- return *ip, nil
- }
- docker, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- return "", fmt.Errorf("cannot create docker client: %s", err)
- }
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
- defer cancel()
- ctr, err := docker.ContainerInspect(ctx, *containerID)
- if err != nil {
- return "", fmt.Errorf("cannot get docker container info: %s", err)
- }
- ip := ctr.NetworkSettings.IPAddress
- if ip == "" {
- // TODO: try to enable networking if it wasn't
- // already enabled when the container was
- // created.
- return "", fmt.Errorf("container has no IP address")
- }
- saved.Store(&ip)
- return ip, nil
- }
-}
"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
- if runner.executor.Runtime() == "docker" {
+ if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
arvMountCmd = append(arvMountCmd, "--allow-other")
}
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("%s", currentUserAndGroups())
- runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+ v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+ runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+ runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+ runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
hostname, hosterr := os.Hostname()
if hosterr != nil {
// dispatcher did not tell us which external IP
// address to advertise --> no gateway service
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
- } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ } else {
cr.gateway = Gateway{
- Address: gwListen,
- AuthSecret: gwAuthSecret,
- ContainerUUID: containerUUID,
- DockerContainerID: &de.containerID,
- Log: cr.CrunchLog,
- ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ Target: cr.executor,
+ Log: cr.CrunchLog,
}
err = cr.gateway.Start()
if err != nil {
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
return e.loadErr
}
func (e *stubExecutor) Runtime() string { return "stub" }
+func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() }
func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
func (e *stubExecutor) CgroupID() string { return "cgroupid" }
func (e *stubExecutor) Wait(context.Context) (int, error) {
return <-e.exit, e.waitErr
}
+func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+ return nil, errors.New("unimplemented")
+}
+func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
c.Assert(s.api.Logs["crunch-run"], NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' using stub runtime.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
"io"
"io/ioutil"
"os"
+ "os/exec"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
watchdogInterval time.Duration
dockerclient *dockerclient.Client
containerID string
+ savedIPAddress atomic.Value
doneIO chan struct{}
errIO error
}
}, err
}
-func (e *dockerExecutor) Runtime() string { return "docker" }
+func (e *dockerExecutor) Runtime() string {
+ v, _ := e.dockerclient.ServerVersion(context.Background())
+ info := ""
+ for _, cv := range v.Components {
+ if info != "" {
+ info += ", "
+ }
+ info += cv.Name + " " + cv.Version
+ }
+ if info == "" {
+ info = "(unknown version)"
+ }
+ return "docker " + info
+}
func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
containerClient *arvados.Client) error {
func (e *dockerExecutor) Close() {
e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
}
+
+func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ if usingTTY {
+ cmd.Args = append(cmd.Args, "-t")
+ }
+ cmd.Args = append(cmd.Args, e.containerID)
+ cmd.Args = append(cmd.Args, injectcmd...)
+ return cmd, nil
+}
+
+func (e *dockerExecutor) IPAddress() (string, error) {
+ if ip, ok := e.savedIPAddress.Load().(*string); ok {
+ return *ip, nil
+ }
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+ if err != nil {
+ return "", fmt.Errorf("cannot get docker container info: %s", err)
+ }
+ ip := ctr.NetworkSettings.IPAddress
+ if ip == "" {
+ // TODO: try to enable networking if it wasn't
+ // already enabled when the container was
+ // created.
+ return "", fmt.Errorf("container has no IP address")
+ }
+ e.savedIPAddress.Store(&ip)
+ return ip, nil
+}
// Release resources (temp dirs, stopped containers)
Close()
- // Name of runtime engine ("docker", "singularity")
+ // Name and version of runtime engine ("docker 20.10.16", "singularity-ce version 3.9.9")
Runtime() string
+
+ GatewayTarget
}
import (
"bytes"
+ "fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"os"
"strings"
}
func (s *executorSuite) TestExecTrivialContainer(c *C) {
+ c.Logf("Using container runtime: %s", s.executor.Runtime())
s.spec.Command = []string{"echo", "ok"}
s.checkRun(c, 0)
c.Check(s.stdout.String(), Equals, "ok\n")
c.Check(s.stderr.String(), Equals, "barwaz\n")
}
+func (s *executorSuite) TestIPAddress(c *C) {
+ // Listen on an available port on the host.
+ ln, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", "0"))
+ c.Assert(err, IsNil)
+ defer ln.Close()
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ c.Assert(err, IsNil)
+
+ // Start a container that listens on the same port number that
+ // is already in use on the host.
+ s.spec.Command = []string{"nc", "-l", "-p", port, "-e", "printf", `HTTP/1.1 418 I'm a teapot\r\n\r\n`}
+ s.spec.EnableNetwork = true
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ defer cancel()
+
+ for ctx.Err() == nil {
+ time.Sleep(time.Second / 10)
+ _, err := s.executor.IPAddress()
+ if err == nil {
+ break
+ }
+ }
+ // When we connect to the port using s.executor.IPAddress(),
+ // we should reach the nc process running inside the
+ // container, not the net.Listen() running outside the
+ // container, even though both listen on the same port.
+ ip, err := s.executor.IPAddress()
+ if c.Check(err, IsNil) && c.Check(ip, Not(Equals), "") {
+ req, err := http.NewRequest("BREW", "http://"+net.JoinHostPort(ip, port), nil)
+ c.Assert(err, IsNil)
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, IsNil)
+ c.Check(resp.StatusCode, Equals, http.StatusTeapot)
+ }
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+
+ c.Logf("stdout:\n%s\n\n", s.stdout.String())
+ c.Logf("stderr:\n%s\n\n", s.stderr.String())
+}
+
+func (s *executorSuite) TestInject(c *C) {
+ hostdir := c.MkDir()
+ c.Assert(os.WriteFile(hostdir+"/testfile", []byte("first tube"), 0777), IsNil)
+ mountdir := fmt.Sprintf("/injecttest-%d", os.Getpid())
+ s.spec.Command = []string{"sleep", "10"}
+ s.spec.BindMounts = map[string]bindmount{mountdir: {HostPath: hostdir, ReadOnly: true}}
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+ defer cancel()
+
+ // Allow InjectCommand to fail a few times while the container
+ // is starting
+ for ctx.Err() == nil {
+ _, err := s.executor.InjectCommand(ctx, "", "root", false, []string{"true"})
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+
+ injectcmd := []string{"cat", mountdir + "/testfile"}
+ cmd, err := s.executor.InjectCommand(ctx, "", "root", false, injectcmd)
+ c.Assert(err, IsNil)
+ out, err := cmd.CombinedOutput()
+ c.Logf("inject %s => %q", injectcmd, out)
+ c.Check(err, IsNil)
+ c.Check(string(out), Equals, "first tube")
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+}
+
func (s *executorSuite) checkRun(c *C, expectCode int) {
c.Assert(s.executor.Create(s.spec), IsNil)
c.Assert(s.executor.Start(), IsNil)
func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
s.engine = "docker"
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: docker Engine \d+\.\d+.*`)
}
func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
s.engine = "singularity"
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: singularity.* version 3\.\d+.*`)
}
func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
package crunchrun
import (
+ "bytes"
+ "errors"
"fmt"
"io/ioutil"
+ "net"
"os"
"os/exec"
+ "os/user"
+ "regexp"
"sort"
+ "strconv"
+ "strings"
"syscall"
"time"
type singularityExecutor struct {
logf func(string, ...interface{})
+ fakeroot bool // use --fakeroot flag, allow --network=bridge when non-root (currently only used by tests)
spec containerSpec
tmpdir string
child *exec.Cmd
}, nil
}
-func (e *singularityExecutor) Runtime() string { return "singularity" }
+func (e *singularityExecutor) Runtime() string {
+ buf, err := exec.Command("singularity", "--version").CombinedOutput()
+ if err != nil {
+ return "singularity (unknown version)"
+ }
+ return strings.TrimSuffix(string(buf), "\n")
+}
func (e *singularityExecutor) getOrCreateProject(ownerUuid string, name string, containerClient *arvados.Client) (*arvados.Group, error) {
var gp arvados.GroupList
}
func (e *singularityExecutor) execCmd(path string) *exec.Cmd {
- args := []string{path, "exec", "--containall", "--cleanenv", "--pwd", e.spec.WorkingDir}
+ args := []string{path, "exec", "--containall", "--cleanenv", "--pwd=" + e.spec.WorkingDir}
+ if e.fakeroot {
+ args = append(args, "--fakeroot")
+ }
if !e.spec.EnableNetwork {
args = append(args, "--net", "--network=none")
+ } else if u, err := user.Current(); err == nil && u.Uid == "0" || e.fakeroot {
+ // Specifying --network=bridge fails unless (a) we are
+ // root, (b) we are using --fakeroot, or (c)
+ // singularity has been configured to allow our
+ // uid/gid to use it like so:
+ //
+ // singularity config global --set 'allow net networks' bridge
+ // singularity config global --set 'allow net groups' mygroup
+ args = append(args, "--net", "--network=bridge")
}
-
if e.spec.CUDADeviceCount != 0 {
args = append(args, "--nv")
}
for _, path := range binds {
mount := e.spec.BindMounts[path]
if path == e.spec.Env["HOME"] {
- // Singularity treates $HOME as special case
+ // Singularity treats $HOME as special case
args = append(args, "--home", mount.HostPath+":"+path)
} else {
args = append(args, "--bind", mount.HostPath+":"+path+":"+readonlyflag[mount.ReadOnly])
env := make([]string, 0, len(e.spec.Env))
for k, v := range e.spec.Env {
if k == "HOME" {
- // Singularity treates $HOME as special case, this is handled
- // with --home above
+ // Singularity treats $HOME as special case,
+ // this is handled with --home above
continue
}
env = append(env, "SINGULARITYENV_"+k+"="+v)
// us to select specific devices we need to propagate that.
env = append(env, "SINGULARITYENV_CUDA_VISIBLE_DEVICES="+cudaVisibleDevices)
}
+ // Singularity's default behavior is to evaluate each
+ // SINGULARITYENV_* env var with a shell as a double-quoted
+ // string and pass the result to the contained
+ // process. Singularity 3.10+ has an option to pass env vars
+ // through literally without evaluating, which is what we
+ // want. See https://github.com/sylabs/singularity/pull/704
+ // and https://dev.arvados.org/issues/19081
+ env = append(env, "SINGULARITY_NO_EVAL=1")
args = append(args, e.imageFilename)
args = append(args, e.spec.Command...)
e.logf("error removing temp dir: %s", err)
}
}
+
+func (e *singularityExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ target, err := e.containedProcess()
+ if err != nil {
+ return nil, err
+ }
+ return exec.CommandContext(ctx, "nsenter", append([]string{fmt.Sprintf("--target=%d", target), "--all"}, injectcmd...)...), nil
+}
+
+var (
+ errContainerHasNoIPAddress = errors.New("container has no IP address distinct from host")
+)
+
+func (e *singularityExecutor) IPAddress() (string, error) {
+ target, err := e.containedProcess()
+ if err != nil {
+ return "", err
+ }
+ targetIPs, err := processIPs(target)
+ if err != nil {
+ return "", err
+ }
+ selfIPs, err := processIPs(os.Getpid())
+ if err != nil {
+ return "", err
+ }
+ for ip := range targetIPs {
+ if !selfIPs[ip] {
+ return ip, nil
+ }
+ }
+ return "", errContainerHasNoIPAddress
+}
+
+func processIPs(pid int) (map[string]bool, error) {
+ fibtrie, err := os.ReadFile(fmt.Sprintf("/proc/%d/net/fib_trie", pid))
+ if err != nil {
+ return nil, err
+ }
+
+ addrs := map[string]bool{}
+ // When we see a pair of lines like this:
+ //
+ // |-- 10.1.2.3
+ // /32 host LOCAL
+ //
+ // ...we set addrs["10.1.2.3"] = true
+ lines := bytes.Split(fibtrie, []byte{'\n'})
+ for linenumber, line := range lines {
+ if !bytes.HasSuffix(line, []byte("/32 host LOCAL")) {
+ continue
+ }
+ if linenumber < 1 {
+ continue
+ }
+ i := bytes.LastIndexByte(lines[linenumber-1], ' ')
+ if i < 0 || i >= len(line)-7 {
+ continue
+ }
+ addr := string(lines[linenumber-1][i+1:])
+ if net.ParseIP(addr).To4() != nil {
+ addrs[addr] = true
+ }
+ }
+ return addrs, nil
+}
+
+var (
+ errContainerNotStarted = errors.New("container has not started yet")
+ errCannotFindChild = errors.New("failed to find any process inside the container")
+ reProcStatusPPid = regexp.MustCompile(`\nPPid:\t(\d+)\n`)
+)
+
+// Return the PID of a process that is inside the container (not
+// necessarily the topmost/pid=1 process in the container).
+func (e *singularityExecutor) containedProcess() (int, error) {
+ if e.child == nil || e.child.Process == nil {
+ return 0, errContainerNotStarted
+ }
+ lsns, err := exec.Command("lsns").CombinedOutput()
+ if err != nil {
+ return 0, fmt.Errorf("lsns: %w", err)
+ }
+ for _, line := range bytes.Split(lsns, []byte{'\n'}) {
+ fields := bytes.Fields(line)
+ if len(fields) < 4 {
+ continue
+ }
+ if !bytes.Equal(fields[1], []byte("pid")) {
+ continue
+ }
+ pid, err := strconv.ParseInt(string(fields[3]), 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing PID field in lsns output: %q", fields[3])
+ }
+ for parent := pid; ; {
+ procstatus, err := os.ReadFile(fmt.Sprintf("/proc/%d/status", parent))
+ if err != nil {
+ break
+ }
+ m := reProcStatusPPid.FindSubmatch(procstatus)
+ if m == nil {
+ break
+ }
+ parent, err = strconv.ParseInt(string(m[1]), 10, 64)
+ if err != nil {
+ break
+ }
+ if int(parent) == e.child.Process.Pid {
+ return int(pid), nil
+ }
+ }
+ }
+ return 0, errCannotFindChild
+}
package crunchrun
import (
+ "os"
"os/exec"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
var _ = Suite(&singularitySuite{})
}
}
+func (s *singularitySuite) TearDownSuite(c *C) {
+ if s.executor != nil {
+ s.executor.Close()
+ }
+}
+
+func (s *singularitySuite) TestIPAddress(c *C) {
+ // In production, executor will choose --network=bridge
+ // because uid=0 under arvados-dispatch-cloud. But in test
+ // cases, uid!=0, which means --network=bridge is conditional
+ // on --fakeroot.
+ uuc, err := os.ReadFile("/proc/sys/kernel/unprivileged_userns_clone")
+ c.Check(err, check.IsNil)
+ if string(uuc) == "0\n" {
+ c.Skip("insufficient privileges to run this test case -- `singularity exec --fakeroot` requires /proc/sys/kernel/unprivileged_userns_clone = 1")
+ }
+ s.executor.(*singularityExecutor).fakeroot = true
+ s.executorSuite.TestIPAddress(c)
+}
+
+func (s *singularitySuite) TestInject(c *C) {
+ path, err := exec.LookPath("nsenter")
+ if err != nil || path != "/var/lib/arvados/bin/nsenter" {
+ c.Skip("looks like /var/lib/arvados/bin/nsenter is not installed -- re-run `arvados-server install`?")
+ }
+ s.executorSuite.TestInject(c)
+}
+
var _ = Suite(&singularityStubSuite{})
// singularityStubSuite tests don't really invoke singularity, so we
c.Check(err, IsNil)
e.imageFilename = "/fake/image.sif"
cmd := e.execCmd("./singularity")
- c.Check(cmd.Args, DeepEquals, []string{"./singularity", "exec", "--containall", "--cleanenv", "--pwd", "/WorkingDir", "--net", "--network=none", "--nv", "--bind", "/hostpath:/mnt:ro", "/fake/image.sif"})
- c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar"})
+ c.Check(cmd.Args, DeepEquals, []string{"./singularity", "exec", "--containall", "--cleanenv", "--pwd=/WorkingDir", "--net", "--network=none", "--nv", "--bind", "/hostpath:/mnt:ro", "/fake/image.sif"})
+ c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar", "SINGULARITY_NO_EVAL=1"})
}
}
if dev || test {
pkgs = append(pkgs, "squashfs-tools") // for singularity
+ pkgs = append(pkgs, "gnupg") // for docker install recipe
}
switch {
case osv.Debian && osv.Major >= 11:
}
}
+ if dev || test {
+ if havedockerversion, err := exec.Command("docker", "--version").CombinedOutput(); err == nil {
+ logger.Printf("%s installed, assuming that version is ok", bytes.TrimSuffix(havedockerversion, []byte("\n")))
+ } else if osv.Debian {
+ var codename string
+ switch osv.Major {
+ case 10:
+ codename = "buster"
+ case 11:
+ codename = "bullseye"
+ default:
+ err = fmt.Errorf("don't know how to install docker-ce for debian %d", osv.Major)
+ return 1
+ }
+ err = inst.runBash(`
+rm -f /usr/share/keyrings/docker-archive-keyring.gpg
+curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
+echo 'deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian/ `+codename+` stable' | \
+ tee /etc/apt/sources.list.d/docker.list
+apt-get update
+DEBIAN_FRONTEND=noninteractive apt-get --yes --no-install-recommends install docker-ce
+`, stdout, stderr)
+ if err != nil {
+ return 1
+ }
+ } else {
+ err = fmt.Errorf("don't know how to install docker for osversion %v", osv)
+ return 1
+ }
+ }
+
os.Mkdir("/var/lib/arvados", 0755)
os.Mkdir("/var/lib/arvados/tmp", 0700)
if prod || pkg {
}
}
+ err = inst.runBash(`
+install /usr/bin/nsenter /var/lib/arvados/bin/nsenter
+setcap "cap_sys_admin+pei cap_sys_chroot+pei" /var/lib/arvados/bin/nsenter
+`, stdout, stderr)
+ if err != nil {
+ return 1
+ }
+
// The entry in /etc/locale.gen is "en_US.UTF-8"; once
// it's installed, locale -a reports it as
// "en_US.utf8".
"http://arvados.org/cwl#ProcessProperties",
"http://commonwl.org/cwltool#CUDARequirement",
"http://arvados.org/cwl#UsePreemptible",
+ "http://arvados.org/cwl#OutputCollectionProperties",
])
def exit_signal_handler(sigcode, frame):
- type: record
name: PropertyDef
doc: |
- Define a property that will be set on the submitted container
- request associated with this workflow or step.
+ Define an arvados metadata property that will be set on a
+ container request or output collection.
fields:
- name: propertyName
type: string
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
mounts[targetdir]["path"] = path
prevdir = targetdir + "/"
+ intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
if not runtimeContext.current_container:
runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
+ vwd.save_new(name=intermediate_collection_info["name"],
owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
+ trash_at=intermediate_collection_info["trash_at"],
+ properties=intermediate_collection_info["properties"])
prev = None
for f, p in sorteditems:
if runtimeContext.submit_runner_cluster:
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
- container_request["output_name"] = "Output for step %s" % (self.name)
+ container_request["output_name"] = "Output from step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
for pr in properties_req["processProperties"]:
container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+ container_request["output_properties"] = {}
+ for pr in output_properties_req["outputProperties"]:
+ container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ else:
+ logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+ self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
import arvados_cwl.util
from .arvcontainer import RunnerContainer
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v, parentfield=parentfield)
- def make_output_collection(self, name, storage_classes, tagsString, outputObj):
+ def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
f.write(res)
- final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
+
+ final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
+ ensure_unique_name=True, properties=output_properties)
logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
final.api_response()["name"],
self.api.containers().update(uuid=current['uuid'],
body={
'output': self.final_output_collection.portable_data_hash(),
+ 'output_properties': self.final_output_collection.get_properties(),
}).execute(num_retries=self.num_retries)
self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
body={
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
+ if not self.output_name:
+ self.output_name = "Output from workflow %s" % runtimeContext.name
+
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
raise Exception("--ignore-docker-for-reuse not supported with containers API.")
if workbench2 or workbench1:
logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
else:
- if self.output_name is None:
- self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
if self.output_tags is None:
self.output_tags = ""
else:
storage_classes = runtimeContext.storage_classes.strip().split(",")
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
+ output_properties = {}
+ output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
+ for pr in output_properties_req["outputProperties"]:
+ output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
+
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
+ self.output_tags, output_properties,
+ self.final_output)
self.set_crunch_output()
if runtimeContext.compute_checksum:
if intermediate_output_ttl > 0:
trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
container_uuid = None
+ props = {"type": "intermediate"}
if current_container:
- container_uuid = current_container['uuid']
- props = {"type": "intermediate", "container": container_uuid}
+ props["container"] = current_container['uuid']
return {"name" : name, "trash_at" : trash_time, "properties" : props}
arvbox start $config $tag
+# Copy the integration test suite from our local arvados clone instead
+# of using the one inside the container, so we can make changes to the
+# integration tests without necessarily having to rebuilding the
+# container image.
+docker cp -L $(readlink -f $(dirname $0)/tests) $ARVBOX_CONTAINER:/usr/src/arvados/sdk/cwl
+
arvbox pipe <<EOF
set -eu -o pipefail
if test -n "$build" ; then
/usr/src/arvados/build/build-dev-docker-jobs-image.sh
-elif test "$tag" = "latest" ; then
- arv-keepdocker --pull arvados/jobs $tag
-else
- set +u
- export WORKSPACE=/usr/src/arvados
- . /usr/src/arvados/build/run-library.sh
- TMPHERE=\$(pwd)
- cd /usr/src/arvados
-
- # This defines python_sdk_version and cwl_runner_version with python-style
- # package suffixes (.dev/rc)
- calculate_python_sdk_cwl_package_versions
-
- cd \$TMPHERE
- set -u
-
- arv-keepdocker --pull arvados/jobs \$cwl_runner_version
- docker tag arvados/jobs:\$cwl_runner_version arvados/jobs:latest
- arv-keepdocker arvados/jobs latest
fi
EXTRA=--compute-checksum
fi
env
+
+# Skip docker_entrypoint test because it fails on singularity
+#
+# Skip timelimit_invalid_wf test because the timeout is very short
+# (5s) and singularity containers loading off an arv-mount take too long
+# to start and get incorrectly terminated
+#
+# Skip test 199 in the v1.1 suite because it has different output
+# depending on whether there is a pty associated with stdout (fixed in
+# the v1.2 suite)
+#
+# Skip test 307 in the v1.2 suite because the test relied on
+# secondary file behavior of cwltool that wasn't actually correct to specification
+
if [[ "$suite" = "integration" ]] ; then
cd /usr/src/arvados/sdk/cwl/tests
exec ./arvados-tests.sh $@
elif [[ "$suite" = "conformance-v1.2" ]] ; then
exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -Sdocker_entrypoint,timelimit_invalid_wf -N307 $@ -- \$EXTRA
-else
- exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -Sdocker_entrypoint,timelimit_invalid_wf $@ -- \$EXTRA
+elif [[ "$suite" = "conformance-v1.1" ]] ; then
+ exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -Sdocker_entrypoint,timelimit_invalid_wf -N199 $@ -- \$EXTRA
+elif [[ "$suite" = "conformance-v1.0" ]] ; then
+ exec cwltest --tool arvados-cwl-runner --test v1.0/conformance_test_v1.0.yaml -Sdocker_entrypoint $@ -- \$EXTRA
fi
EOF
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.2
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+hints:
+ arv:OutputCollectionProperties:
+ outputProperties:
+ foo: bar
+ baz: $(inputs.inp.basename)
+inputs:
+ inp: File
+steps:
+ cat:
+ in:
+ inp: inp
+ run: cat.cwl
+ out: []
+outputs: []
# conformance tests.
#
-set -e
+set -ex
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
arv-put --portable-data-hash testdir/*
fi
# Use the python executor associated with the installed OS package, if present.
-python=$(((ls /usr/share/python3*/dist/python3-arvados-cwl-runner/bin/python || echo python) | head -n1) 2>/dev/null)
+python=$(((ls /usr/share/python3*/dist/python3-arvados-cwl-runner/bin/python || echo python3) | head -n1) 2>/dev/null)
# Test for #18888
# This is a standalone test because the bug was observed with this
# integration test to check for the expected behavior.
$python test_copy_deps.py
+# Test for #17004
+# Checks that the final output collection has the expected properties.
+$python test_set_output_prop.py
+
# Run integration tests
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_'+str(enable_reuse),
+ 'output_name': 'Output from step test_run_'+str(enable_reuse),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 5242880000 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_resource_requirements',
+ 'output_name': 'Output from step test_resource_requirements',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 7200,
}
},
'state': 'Committed',
- 'output_name': 'Output for step test_initial_work_dir',
+ 'output_name': 'Output from step test_initial_work_dir',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
},
},
'state': 'Committed',
- "output_name": "Output for step test_run_redirect",
+ "output_name": "Output from step test_run_redirect",
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_mounts',
+ 'output_name': 'Output from step test_run_mounts',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_secrets',
+ 'output_name': 'Output from step test_secrets',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True' + ("" if test_case == 0 else "_"+str(test_case+1)),
+ 'output_name': 'Output from step test_run_True' + ("" if test_case == 0 else "_"+str(test_case+1)),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
runtimeContext.match_local_docker = True
container_request['container_image'] = '99999999999999999999999999999993+99'
container_request['name'] = 'test_run_True_2'
- container_request['output_name'] = 'Output for step test_run_True_2'
+ container_request['output_name'] = 'Output from step test_run_True_2'
for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step '+runtimeContext.name,
+ 'output_name': 'Output from step '+runtimeContext.name,
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
}))
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_output_properties(self, keepdocker):
+ arvados_cwl.add_arv_hints()
+ for rev in ["20210628", "20220510"]:
+ runner = mock.MagicMock()
+ runner.ignore_docker_for_reuse = False
+ runner.intermediate_output_ttl = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+ runner.api._rootDesc = {"revision": rev}
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ tool = cmap({
+ "inputs": [{
+ "id": "inp",
+ "type": "string"
+ }],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+ "id": "",
+ "cwlVersion": "v1.2",
+ "class": "CommandLineTool",
+ "hints": [
+ {
+ "class": "http://arvados.org/cwl#OutputCollectionProperties",
+ "outputProperties": {
+ "foo": "bar",
+ "baz": "$(inputs.inp)"
+ }
+ }
+ ]
+ })
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_timelimit"
+
+ arvtool = cwltool.load_tool.load_tool(tool, loadingContext)
+ arvtool.formatgraph = None
+
+ for j in arvtool.job({"inp": "quux"}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
+
+ _, kwargs = runner.api.container_requests().create.call_args
+ if rev == "20220510":
+ self.assertEqual({"foo": "bar", "baz": "quux"}, kwargs['body'].get('output_properties'))
+ else:
+ self.assertEqual(None, kwargs['body'].get('output_properties'))
+
class TestWorkflow(unittest.TestCase):
def setUp(self):
}
},
"name": "scatterstep",
- "output_name": "Output for step scatterstep",
+ "output_name": "Output from step scatterstep",
"output_path": "/var/spool/cwl",
"output_ttl": 0,
"priority": 500,
u'cwl.input.yml'
],
'use_existing': True,
- 'output_name': u'Output for step echo-subwf',
+ 'output_name': u'Output from step echo-subwf',
'cwd': '/var/spool/cwl',
'output_storage_classes': ["default"]
}))
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "tag0,tag1,tag2", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "tag0,tag1,tag2", {}, {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
final.copy.assert_has_calls([mock.call('bar.txt', 'baz.txt', overwrite=False, source_collection=readermock)])
final.copy.assert_has_calls([mock.call('foo.txt', 'foo.txt', overwrite=False, source_collection=readermock)])
- final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz', storage_classes=['foo'])])
+ final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz', properties={}, storage_classes=['foo'])])
self.assertEqual("""{
"bar": {
"basename": "baz.txt",
reader.return_value = readermock
# This output describes a single file listed in 2 different directories
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", { 'out': [
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", {}, { 'out': [
{
'basename': 'testdir1',
'listing': [
reader.return_value = readermock
# This output describes two literals with the same basename
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", [
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", {}, [
{
'lit':
{
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-17004-project", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "17004-output-props.cwl", "--inp", "scripts/download_all_data.sh"]
+ print(" ".join(cmd))
+ subprocess.check_output(cmd)
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+
+ found = False
+ for c in contents["items"]:
+ if (c["kind"] == "arvados#collection" and
+ c["properties"].get("type") == "output" and
+ c["properties"].get("foo") == "bar" and
+ c["properties"].get("baz") == "download_all_data.sh"):
+ found = True
+ if not found:
+ raise Exception("Didn't find collection with properties")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+if __name__ == '__main__':
+ test_execute()
_rootDesc = None
-def stubs(func):
- @functools.wraps(func)
- @mock.patch("arvados_cwl.arvdocker.determine_image_id")
- @mock.patch("uuid.uuid4")
- @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
- @mock.patch("arvados.collection.KeepClient")
- @mock.patch("arvados.keep.KeepClient")
- @mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client1, keep_client2, keepdocker,
- uuid4, determine_image_id, *args, **kwargs):
- class Stubs(object):
- pass
- stubs = Stubs()
- stubs.events = events
- stubs.keepdocker = keepdocker
-
- uuid4.side_effect = ["df80736f-f14d-4b10-b2e3-03aa27f034bb", "df80736f-f14d-4b10-b2e3-03aa27f034b1",
- "df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
- "df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
-
- determine_image_id.return_value = None
-
- def putstub(p, **kwargs):
- return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
- keep_client1().put.side_effect = putstub
- keep_client1.put.side_effect = putstub
- keep_client2().put.side_effect = putstub
- keep_client2.put.side_effect = putstub
-
- stubs.keep_client = keep_client2
- stubs.docker_images = {
- "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", {})],
- "debian:buster-slim": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", {})],
- "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
- "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
- }
- def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
- return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
- stubs.keepdocker.side_effect = kd
+def stubs(wfname='submit_wf.cwl'):
+ def outer_wrapper(func, *rest):
+ @functools.wraps(func)
+ @mock.patch("arvados_cwl.arvdocker.determine_image_id")
+ @mock.patch("uuid.uuid4")
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ @mock.patch("arvados.collection.KeepClient")
+ @mock.patch("arvados.keep.KeepClient")
+ @mock.patch("arvados.events.subscribe")
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker,
+ uuid4, determine_image_id, *args, **kwargs):
+ class Stubs(object):
+ pass
+ stubs = Stubs()
+ stubs.events = events
+ stubs.keepdocker = keepdocker
+
+ uuid4.side_effect = ["df80736f-f14d-4b10-b2e3-03aa27f034bb", "df80736f-f14d-4b10-b2e3-03aa27f034b1",
+ "df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
+ "df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
+
+ determine_image_id.return_value = None
+
+ def putstub(p, **kwargs):
+ return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
+ keep_client1().put.side_effect = putstub
+ keep_client1.put.side_effect = putstub
+ keep_client2().put.side_effect = putstub
+ keep_client2.put.side_effect = putstub
+
+ stubs.keep_client = keep_client2
+ stubs.docker_images = {
+ "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", {})],
+ "debian:buster-slim": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", {})],
+ "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
+ "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
+ }
+ def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
+ return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
+ stubs.keepdocker.side_effect = kd
- stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
- stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
+ stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+ stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
- if sys.version_info[0] < 3:
- stubs.capture_stdout = BytesIO()
- else:
- stubs.capture_stdout = StringIO()
+ if sys.version_info[0] < 3:
+ stubs.capture_stdout = BytesIO()
+ else:
+ stubs.capture_stdout = StringIO()
- stubs.api = mock.MagicMock()
- stubs.api._rootDesc = get_rootDesc()
- stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
- stubs.api._rootDesc["revision"] = "20210628"
+ stubs.api = mock.MagicMock()
+ stubs.api._rootDesc = get_rootDesc()
+ stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
+ stubs.api._rootDesc["revision"] = "20210628"
- stubs.api.users().current().execute.return_value = {
- "uuid": stubs.fake_user_uuid,
- }
- stubs.api.collections().list().execute.return_value = {"items": []}
- stubs.api.containers().current().execute.return_value = {
- "uuid": stubs.fake_container_uuid,
- }
- stubs.api.config()["StorageClasses"].items.return_value = {
- "default": {
- "Default": True
- }
- }.items()
-
- class CollectionExecute(object):
- def __init__(self, exe):
- self.exe = exe
- def execute(self, num_retries=None):
- return self.exe
-
- def collection_createstub(created_collections, body, ensure_unique_name=None):
- mt = body["manifest_text"].encode('utf-8')
- uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
- pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
- created_collections[uuid] = {
- "uuid": uuid,
- "portable_data_hash": pdh,
- "manifest_text": mt.decode('utf-8')
+ stubs.api.users().current().execute.return_value = {
+ "uuid": stubs.fake_user_uuid,
}
- return CollectionExecute(created_collections[uuid])
-
- def collection_getstub(created_collections, uuid):
- for v in viewvalues(created_collections):
- if uuid in (v["uuid"], v["portable_data_hash"]):
- return CollectionExecute(v)
-
- created_collections = {
- "99999999999999999999999999999998+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999998+99",
- "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
- },
- "99999999999999999999999999999997+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999997+99",
- "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
- },
- "99999999999999999999999999999994+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999994+99",
- "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
- "portable_data_hash": "999999999999999999999999999999d3+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
- "portable_data_hash": "999999999999999999999999999999d4+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
- "portable_data_hash": "999999999999999999999999999999d5+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
- "portable_data_hash": "999999999999999999999999999999d6+99",
- "manifest_text": ""
+ stubs.api.collections().list().execute.return_value = {"items": []}
+ stubs.api.containers().current().execute.return_value = {
+ "uuid": stubs.fake_container_uuid,
}
- }
- stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
- stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
-
- stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- stubs.api.jobs().create().execute.return_value = {
- "uuid": stubs.expect_job_uuid,
- "state": "Queued",
- }
-
- stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
- stubs.api.container_requests().create().execute.return_value = {
- "uuid": stubs.expect_container_request_uuid,
- "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
-
- stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.api.pipeline_templates().create().execute.return_value = {
- "uuid": stubs.expect_pipeline_template_uuid,
- }
- stubs.expect_job_spec = {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'x': {
- 'basename': 'blorp.txt',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- 'class': 'File'
+ stubs.api.config()["StorageClasses"].items.return_value = {
+ "default": {
+ "Default": True
+ }
+ }.items()
+
+ class CollectionExecute(object):
+ def __init__(self, exe):
+ self.exe = exe
+ def execute(self, num_retries=None):
+ return self.exe
+
+ def collection_createstub(created_collections, body, ensure_unique_name=None):
+ mt = body["manifest_text"].encode('utf-8')
+ uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
+ pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
+ created_collections[uuid] = {
+ "uuid": uuid,
+ "portable_data_hash": pdh,
+ "manifest_text": mt.decode('utf-8')
+ }
+ return CollectionExecute(created_collections[uuid])
+
+ def collection_getstub(created_collections, uuid):
+ for v in viewvalues(created_collections):
+ if uuid in (v["uuid"], v["portable_data_hash"]):
+ return CollectionExecute(v)
+
+ created_collections = {
+ "99999999999999999999999999999998+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
},
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'
+ "99999999999999999999999999999997+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999997+99",
+ "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
},
- 'z': {
- 'basename': 'anonymous',
- "listing": [{
- "basename": "renamed.txt",
- "class": "File",
- "location": "keep:99999999999999999999999999999998+99/file1.txt",
- "size": 0
- }],
- 'class': 'Directory'
+ "99999999999999999999999999999994+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner'
- }
- stubs.pipeline_component = stubs.expect_job_spec.copy()
- stubs.expect_pipeline_instance = {
- 'name': 'submit_wf.cwl',
- 'state': 'RunningOnServer',
- 'owner_uuid': None,
- "components": {
- "cwl-runner": {
- 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
- 'script_parameters': {
- 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {
- 'basename': 'blorp.txt',
- 'class': 'File',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
- }},
- 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
- 'listing': [
- {
- 'basename': 'renamed.txt',
- 'class': 'File', 'location':
- 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
- 'arv:debug': True,
- 'arv:enable_reuse': True,
- 'arv:on_error': 'continue'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
+ "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
+ "portable_data_hash": "999999999999999999999999999999d3+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
+ "portable_data_hash": "999999999999999999999999999999d4+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
+ "portable_data_hash": "999999999999999999999999999999d5+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
+ "portable_data_hash": "999999999999999999999999999999d6+99",
+ "manifest_text": ""
}
}
- }
- stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
- stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
- stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
- stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
- stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
+ stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
- with open("tests/wf/submit_wf_packed.cwl") as f:
- expect_packed_workflow = yaml.round_trip_load(f)
+ stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ stubs.api.jobs().create().execute.return_value = {
+ "uuid": stubs.expect_job_uuid,
+ "state": "Queued",
+ }
- stubs.expect_container_spec = {
- 'priority': 500,
- 'mounts': {
- '/var/spool/cwl': {
- 'writable': True,
- 'kind': 'collection'
- },
- '/var/lib/cwl/workflow.json': {
- 'content': expect_packed_workflow,
- 'kind': 'json'
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+ stubs.api.container_requests().create().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
+ stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.api.pipeline_templates().create().execute.return_value = {
+ "uuid": stubs.expect_pipeline_template_uuid,
+ }
+ stubs.expect_job_spec = {
+ 'runtime_constraints': {
+ 'docker_image': '999999999999999999999999999999d3+99',
+ 'min_ram_mb_per_node': 1024
},
- 'stdout': {
- 'path': '/var/spool/cwl/cwl.output.json',
- 'kind': 'file'
+ 'script_parameters': {
+ 'x': {
+ 'basename': 'blorp.txt',
+ 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ 'class': 'File'
+ },
+ 'y': {
+ 'basename': '99999999999999999999999999999998+99',
+ 'location': 'keep:99999999999999999999999999999998+99',
+ 'class': 'Directory'
+ },
+ 'z': {
+ 'basename': 'anonymous',
+ "listing": [{
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "size": 0
+ }],
+ 'class': 'Directory'
+ },
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
},
- '/var/lib/cwl/cwl.input.json': {
- 'kind': 'json',
- 'content': {
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'},
- 'x': {
- 'basename': u'blorp.txt',
- 'class': 'File',
- 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
+ 'script': 'cwl-runner'
+ }
+ stubs.pipeline_component = stubs.expect_job_spec.copy()
+ stubs.expect_pipeline_instance = {
+ 'name': 'submit_wf.cwl',
+ 'state': 'RunningOnServer',
+ 'owner_uuid': None,
+ "components": {
+ "cwl-runner": {
+ 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
+ 'script_parameters': {
+ 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
+ 'x': {"value": {
+ 'basename': 'blorp.txt',
+ 'class': 'File',
+ 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ "size": 16
+ }},
+ 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
+ 'listing': [
+ {
+ 'basename': 'renamed.txt',
+ 'class': 'File', 'location':
+ 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
+ }
+ ]}},
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
+ 'arv:debug': True,
+ 'arv:enable_reuse': True,
+ 'arv:on_error': 'continue'
},
- 'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
- {'basename': 'renamed.txt',
- 'class': 'File',
- 'location': 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}
- },
- 'kind': 'json'
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
+ 'script': 'cwl-runner',
+ 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
+ }
}
- },
- 'secret_mounts': {},
- 'state': 'Committed',
- 'command': ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
- 'name': 'submit_wf.cwl',
- 'container_image': '999999999999999999999999999999d3+99',
- 'output_path': '/var/spool/cwl',
- 'cwd': '/var/spool/cwl',
- 'runtime_constraints': {
- 'API': True,
- 'vcpus': 1,
- 'ram': (1024+256)*1024*1024
- },
- 'use_existing': False,
- 'properties': {},
- 'secret_mounts': {}
- }
-
- stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
- stubs.api.workflows().create().execute.return_value = {
- "uuid": stubs.expect_workflow_uuid,
- }
- def update_mock(**kwargs):
- stubs.updated_uuid = kwargs.get('uuid')
- return mock.DEFAULT
- stubs.api.workflows().update.side_effect = update_mock
- stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
- "uuid": stubs.updated_uuid,
- }
+ }
+ stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+ stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+ stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+ stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+ stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
+ stubs.expect_container_spec = {
+ 'priority': 500,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/workflow.json': {
+ 'content': expect_packed_workflow,
+ 'kind': 'json'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/cwl.input.json': {
+ 'kind': 'json',
+ 'content': {
+ 'y': {
+ 'basename': '99999999999999999999999999999998+99',
+ 'location': 'keep:99999999999999999999999999999998+99',
+ 'class': 'Directory'},
+ 'x': {
+ 'basename': u'blorp.txt',
+ 'class': 'File',
+ 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ "size": 16
+ },
+ 'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
+ {'basename': 'renamed.txt',
+ 'class': 'File',
+ 'location': 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
+ }
+ ]}
+ },
+ 'kind': 'json'
+ }
+ },
+ 'secret_mounts': {},
+ 'state': 'Committed',
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow '+wfname,
+ '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+ 'name': wfname,
+ 'container_image': '999999999999999999999999999999d3+99',
+ 'output_name': 'Output from workflow '+wfname,
+ 'output_path': '/var/spool/cwl',
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': (1024+256)*1024*1024
+ },
+ 'use_existing': False,
+ 'properties': {},
+ 'secret_mounts': {}
+ }
- return func(self, stubs, *args, **kwargs)
- return wrapped
+ stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
+ stubs.api.workflows().create().execute.return_value = {
+ "uuid": stubs.expect_workflow_uuid,
+ }
+ def update_mock(**kwargs):
+ stubs.updated_uuid = kwargs.get('uuid')
+ return mock.DEFAULT
+ stubs.api.workflows().update.side_effect = update_mock
+ stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
+ "uuid": stubs.updated_uuid,
+ }
+ return func(self, stubs, *args, **kwargs)
+ return wrapped
+ return outer_wrapper
class TestSubmit(unittest.TestCase):
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--disable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('submit_wf_no_reuse.cwl')
def test_submit_container_reuse_disabled_by_workflow(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--disable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf_no_reuse.cwl', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
- expect_container["name"] = "submit_wf_no_reuse.cwl"
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#ReuseRequirement",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
'--debug', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", "--debug",
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl',
+ "--debug",
"--storage-classes=foo", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", "--debug",
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
+ "--debug",
"--storage-classes=foo,bar", "--intermediate-storage-classes=baz", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
make_output.return_value = ({},final_output_c)
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
stubs.api.config().get.return_value = {"default": {"Default": True}}
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
make_output.return_value = ({},final_output_c)
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_storage_class_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_storage_class_wf.cwl', ['foo', 'bar'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_storage_class_wf.cwl', ['foo', 'bar'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@stubs
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl", '--debug',
'--on-error=continue',
"--intermediate-output-ttl=3600",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
"--output-tags="+output_tags, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
}, 'state': 'Committed',
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
+ 'output_name': 'Output from workflow expect_arvworkflow.cwl#main',
'container_image': '999999999999999999999999999999d3+99',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow expect_arvworkflow.cwl#main',
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('hello container 123')
def test_submit_container_name(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["name"] = "hello container 123"
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
"--eval-timeout=20", "--thread-count=0",
- '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl", '--debug',
'--on-error=continue',
'--project-uuid='+project_uuid,
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('submit_wf_runner_resources.cwl')
def test_submit_wf_runner_resources(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"vcpus": 2,
"ram": (2000+512) * 2**20
}
- expect_container["name"] = "submit_wf_runner_resources.cwl"
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#WorkflowRunnerResources",
expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=512",
+ '--output-name=Output from workflow submit_wf_runner_resources.cwl',
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
'--thread-count=0',
"--enable-reuse",
"--collection-cache-size=256",
+ '--output-name=Output from workflow secret_wf.cwl'
'--debug',
"--on-error=continue",
"/var/lib/cwl/workflow.json#main",
}
},
"name": "secret_wf.cwl",
+ "output_name": "Output from workflow secret_wf.cwl",
"output_path": "/var/spool/cwl",
"priority": 500,
"properties": {},
finally:
cwltool_logger.removeHandler(stderr_logger)
- @stubs
+ @stubs('submit_wf_process_properties.cwl')
def test_submit_set_process_properties(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["name"] = "submit_wf_process_properties.cwl"
+
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#ProcessProperties",
Filters []Filter `json:"filters"`
ContainerCount int `json:"container_count"`
OutputStorageClasses []string `json:"output_storage_classes"`
+ OutputProperties map[string]interface{} `json:"output_properties"`
}
// Mount is special behavior to attach to a filesystem path or device.
cluster.SystemRootToken = arvadostest.SystemRootToken
cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
+ cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
s.handler = &Aggregator{Cluster: cluster}
s.req = httptest.NewRequest("GET", "/_health/all", nil)
s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
http._request_id = util.new_request_id
return http
+def _close_connections(self):
+ for conn in self._http.connections.values():
+ conn.close()
+
# Monkey patch discovery._cast() so objects and arrays get serialized
# with json.dumps() instead of str().
_cast_orig = apiclient_discovery._cast
svc.request_id = request_id
svc.config = lambda: util.get_config_once(svc)
svc.vocabulary = lambda: util.get_vocabulary_once(svc)
+ svc.close_connections = types.MethodType(_close_connections, svc)
kwargs['http'].max_request_size = svc._rootDesc.get('maxRequestSize', 0)
kwargs['http'].cache = None
kwargs['http']._request_id = lambda: svc.request_id or util.new_request_id()
from arvados._version import __version__
-api_client = None
logger = logging.getLogger('arvados.arv-get')
parser = argparse.ArgumentParser(
return args
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
- global api_client
-
if stdout is sys.stdout and hasattr(stdout, 'buffer'):
# in Python 3, write to stdout as binary
stdout = stdout.buffer
request_id = arvados.util.new_request_id()
logger.info('X-Request-Id: '+request_id)
- if api_client is None:
- api_client = arvados.api('v1', request_id=request_id)
+ api_client = arvados.api('v1', request_id=request_id)
r = re.search(r'^(.*?)(/.*)?$', args.locator)
col_loc = r.group(1)
'bar.txt' : 'bar',
'subdir/baz.txt' : 'baz',
}):
- c = collection.Collection()
+ api = arvados.api()
+ c = collection.Collection(api_client=api)
for path, data in listitems(contents):
with c.open(path, 'wb') as f:
f.write(data)
c.save_new()
+ api.close_connections()
+
return (c.manifest_locator(),
c.portable_data_hash(),
c.manifest_text(strip=strip_manifest))
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.8)
- nokogiri (1.13.4)
+ nokogiri (1.13.6)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
oj (3.9.2)
# format is YYYYMMDD, must be fixed width (needs to be lexically
# sortable), updated manually, may be used by clients to
# determine availability of API server features.
- revision: "20220222",
+ revision: "20220510",
source_version: AppVersion.hash,
sourceVersion: AppVersion.hash, # source_version should be deprecated in the future
packageVersion: AppVersion.package_version,
attribute :runtime_status, :jsonbHash, default: {}
attribute :runtime_auth_scopes, :jsonbArray, default: []
attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
serialize :environment, Hash
serialize :mounts, Hash
t.add :gateway_address
t.add :interactive_session_started
t.add :output_storage_classes
+ t.add :output_properties
end
# Supported states for a container
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output]
+ progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties]
final_attrs = [:exit_code, :finished_at]
if self.new_record?
permitted.push :priority
when Running
- permitted.push :priority, *progress_attrs
+ permitted.push :priority, :output_properties, *progress_attrs
if self.state_changed?
permitted.push :started_at, :gateway_address
end
attribute :properties, :jsonbHash, default: {}
attribute :secret_mounts, :jsonbHash, default: {}
attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
serialize :environment, Hash
serialize :mounts, Hash
t.add :state
t.add :use_existing
t.add :output_storage_classes
+ t.add :output_properties
end
# Supported states for a container request
:output_path, :priority, :runtime_token,
:runtime_constraints, :state, :container_uuid, :use_existing,
:scheduling_parameters, :secret_mounts, :output_name, :output_ttl,
- :output_storage_classes]
+ :output_storage_classes, :output_properties]
def self.any_preemptible_instances?
Rails.configuration.InstanceTypes.any? do |k, v|
owner_uuid: self.owner_uuid,
name: coll_name,
manifest_text: "",
- storage_classes_desired: self.output_storage_classes,
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
+ storage_classes_desired: self.output_storage_classes)
end
if out_type == "log"
manifest = dst.manifest_text
end
+ merged_properties = {}
+ merged_properties['container_request'] = uuid
+
+ if out_type == 'output' and !requesting_container_uuid.nil?
+ # output of a child process, give it "intermediate" type by
+ # default.
+ merged_properties['type'] = 'intermediate'
+ else
+ merged_properties['type'] = out_type
+ end
+
+ if out_type == "output"
+ merged_properties.update(container.output_properties)
+ merged_properties.update(self.output_properties)
+ end
+
coll.assign_attributes(
portable_data_hash: Digest::MD5.hexdigest(manifest) + '+' + manifest.bytesize.to_s,
manifest_text: manifest,
trash_at: trash_at,
- delete_at: trash_at)
+ delete_at: trash_at,
+ properties: merged_properties)
coll.save_with_unique_name!
self.send(out_type + '_uuid=', coll.uuid)
end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddOutputProperties < ActiveRecord::Migration[5.2]
+ def trgm_indexes
+ {
+ "container_requests" => "container_requests_trgm_text_search_idx",
+ }
+ end
+
+ def up
+ add_column :container_requests, :output_properties, :jsonb, default: {}
+ add_column :containers, :output_properties, :jsonb, default: {}
+
+ trgm_indexes.each do |model, indx|
+ execute "DROP INDEX IF EXISTS #{indx}"
+ execute "CREATE INDEX #{indx} ON #{model} USING gin((#{model.classify.constantize.full_text_trgm}) gin_trgm_ops)"
+ end
+ end
+
+ def down
+ remove_column :container_requests, :output_properties
+ remove_column :containers, :output_properties
+
+ trgm_indexes.each do |model, indx|
+ execute "DROP INDEX IF EXISTS #{indx}"
+ execute "CREATE INDEX #{indx} ON #{model} USING gin((#{model.classify.constantize.full_text_trgm}) gin_trgm_ops)"
+ end
+ end
+end
output_ttl integer DEFAULT 0 NOT NULL,
secret_mounts jsonb DEFAULT '{}'::jsonb,
runtime_token text,
- output_storage_classes jsonb DEFAULT '["default"]'::jsonb
+ output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
+ output_properties jsonb DEFAULT '{}'::jsonb
);
lock_count integer DEFAULT 0 NOT NULL,
gateway_address character varying,
interactive_session_started boolean DEFAULT false NOT NULL,
- output_storage_classes jsonb DEFAULT '["default"]'::jsonb
+ output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
+ output_properties jsonb DEFAULT '{}'::jsonb
);
-- Name: container_requests_trgm_text_search_idx; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX container_requests_trgm_text_search_idx ON public.container_requests USING gin (((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)) public.gin_trgm_ops);
+CREATE INDEX container_requests_trgm_text_search_idx ON public.container_requests USING gin (((((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text) || ' '::text) || COALESCE((output_properties)::text, ''::text))) public.gin_trgm_ops);
--
('20220224203102'),
('20220301155729'),
('20220303204419'),
-('20220401153101');
+('20220401153101'),
+('20220505112900');
].each do |token, expected, expected_priority|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
set_user_from_auth token
- cr = ContainerRequest.create(container_image: "img", output_path: "/tmp", command: ["echo", "foo"])
+ cr = create_minimal_req!
assert_not_nil cr.uuid, 'uuid should be set for newly created container_request'
assert_equal expected, cr.requesting_container_uuid
assert_equal expected_priority, cr.priority
end
end
+ [
+ ['running_container_auth', 'zzzzz-dz642-runningcontainr', 501],
+ ].each do |token, expected, expected_priority|
+ test "create as #{token} with requesting_container_uuid set and expect output to be intermediate" do
+ set_user_from_auth token
+ cr = create_minimal_req!
+ assert_not_nil cr.uuid, 'uuid should be set for newly created container_request'
+ assert_equal expected, cr.requesting_container_uuid
+ assert_equal expected_priority, cr.priority
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ run_container(cr)
+ cr.reload
+ output = Collection.find_by_uuid(cr.output_uuid)
+ props = {"type": "intermediate", "container_request": cr.uuid}
+ assert_equal props.symbolize_keys, output.properties.symbolize_keys
+ end
+ end
+
test "create as container_runtime_token and expect requesting_container_uuid to be zzzzz-dz642-20isqbkl8xwnsao" do
set_user_from_auth :container_runtime_token
Thread.current[:token] = "#{Thread.current[:token]}/zzzzz-dz642-20isqbkl8xwnsao"
assert_equal ["foo_storage_class"], output1.storage_classes_desired
assert_equal ["bar_storage_class"], output2.storage_classes_desired
end
+
+ [
+ [{}, {}, {"type": "output"}],
+ [{"a1": "b1"}, {}, {"type": "output", "a1": "b1"}],
+ [{}, {"a1": "b1"}, {"type": "output", "a1": "b1"}],
+ [{"a1": "b1"}, {"a1": "c1"}, {"type": "output", "a1": "b1"}],
+ [{"a1": "b1"}, {"a2": "c2"}, {"type": "output", "a1": "b1", "a2": "c2"}],
+ [{"type": "blah"}, {}, {"type": "blah"}],
+ ].each do |cr_prop, container_prop, expect_prop|
+ test "setting output_properties #{cr_prop} #{container_prop} on current container" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: ContainerRequest::Committed,
+ output_name: 'foo',
+ output_properties: cr_prop)
+
+ act_as_system_user do
+ logc = Collection.new(owner_uuid: system_user_uuid,
+ manifest_text: ". ef772b2f28e2c8ca84de45466ed19ee9+7815 0:0:arv-mount.txt\n")
+ logc.save!
+
+ c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+
+ c.update_attributes!(output_properties: container_prop)
+
+ c.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: logc.portable_data_hash)
+ logc.destroy
+ end
+
+ cr.reload
+ expect_prop["container_request"] = cr.uuid
+ output = Collection.find_by_uuid(cr.output_uuid)
+ assert_equal expect_prop.symbolize_keys, output.properties.symbolize_keys
+ end
+ end
+ end
+
end
RUN apt-key add --no-tty /tmp/8D81803C0EBFCD88.asc && \
rm -f /tmp/8D81803C0EBFCD88.asc
-RUN mkdir -p /etc/apt/sources.list.d && \
- echo deb https://download.docker.com/linux/debian/ buster stable > /etc/apt/sources.list.d/docker.list && \
- apt-get update && \
- apt-get -yq --no-install-recommends install docker-ce=5:20.10.6~3-0~debian-buster && \
- apt-get clean
+# docker is now installed by arvados-server install
+# RUN mkdir -p /etc/apt/sources.list.d && \
+# echo deb https://download.docker.com/linux/debian/ buster stable > /etc/apt/sources.list.d/docker.list && \
+# apt-get update && \
+# apt-get -yq --no-install-recommends install docker-ce=5:20.10.6~3-0~debian-buster && \
+# apt-get clean
# Set UTF-8 locale
RUN echo en_US.UTF-8 UTF-8 > /etc/locale.gen && locale-gen
exit
fi
+API_HOST=${localip}:${services[controller-ssl]}
+
+if test -f /usr/src/workbench2/public/API_HOST ; then
+ API_HOST=$(cat /usr/src/workbench2/public/API_HOST)
+fi
+
cat <<EOF > /usr/src/workbench2/public/config.json
{
- "API_HOST": "${localip}:${services[controller-ssl]}",
- "VOCABULARY_URL": "/vocabulary-example.json",
- "FILE_VIEWERS_CONFIG_URL": "/file-viewers-example.json"
+ "API_HOST": "$API_HOST"
}
EOF