import (
"bytes"
+ "context"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
+ "log"
+ "math/rand"
+ "net/http"
+ "net/http/httptest"
+ "net/http/httputil"
+ "net/url"
"os"
"os/exec"
+ "path"
"regexp"
"runtime/pprof"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
. "gopkg.in/check.v1"
+ git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
+ git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
)
// Gocheck boilerplate
TestingT(t)
}
+const logLineStart = `(?m)(.*\n)*\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z `
+
var _ = Suite(&TestSuite{})
type TestSuite struct {
keepmountTmp []string
testDispatcherKeepClient KeepTestClient
testContainerKeepClient KeepTestClient
+ debian12MemoryCurrent int64
+ debian12SwapCurrent int64
+}
+
+func (s *TestSuite) SetUpSuite(c *C) {
+ buf, err := os.ReadFile("../crunchstat/testdata/debian12/sys/fs/cgroup/user.slice/user-1000.slice/session-4.scope/memory.current")
+ c.Assert(err, IsNil)
+ _, err = fmt.Sscanf(string(buf), "%d", &s.debian12MemoryCurrent)
+ c.Assert(err, IsNil)
+
+ buf, err = os.ReadFile("../crunchstat/testdata/debian12/sys/fs/cgroup/user.slice/user-1000.slice/session-4.scope/memory.swap.current")
+ c.Assert(err, IsNil)
+ _, err = fmt.Sscanf(string(buf), "%d", &s.debian12SwapCurrent)
+ c.Assert(err, IsNil)
}
func (s *TestSuite) SetUpTest(c *C) {
stopErr error
stopped bool
closed bool
- runFunc func()
+ runFunc func() int
exit chan int
}
func (e *stubExecutor) Runtime() string { return "stub" }
func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() }
func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
-func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
-func (e *stubExecutor) CgroupID() string { return "cgroupid" }
-func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
-func (e *stubExecutor) Close() { e.closed = true }
+func (e *stubExecutor) Start() error {
+ e.exit = make(chan int, 1)
+ go func() { e.exit <- e.runFunc() }()
+ return e.startErr
+}
+func (e *stubExecutor) Pid() int { return 1115883 } // matches pid in ../crunchstat/testdata/debian12/proc/
+func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
+func (e *stubExecutor) Close() { e.closed = true }
func (e *stubExecutor) Wait(context.Context) (int, error) {
return <-e.exit, e.waitErr
}
if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ md5sum := md5.Sum([]byte(mt))
outmap := output.(*arvados.Collection)
- outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
- outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
+ outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5sum, len(mt))
+ outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%015x", md5sum[:7])
}
return nil
return nil, nil
}
+type apiStubServer struct {
+ server *httptest.Server
+ proxy *httputil.ReverseProxy
+ intercept func(http.ResponseWriter, *http.Request) bool
+
+ container arvados.Container
+ logs map[string]string
+}
+
+func apiStub() (*arvados.Client, *apiStubServer) {
+ client := arvados.NewClientFromEnv()
+ apistub := &apiStubServer{}
+ apistub.server = httptest.NewTLSServer(apistub)
+ apistub.proxy = httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "https", Host: client.APIHost})
+ if client.Insecure {
+ apistub.proxy.Transport = arvados.InsecureHTTPClient.Transport
+ }
+ client.APIHost = apistub.server.Listener.Addr().String()
+ return client, apistub
+}
+
+func (apistub *apiStubServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if apistub.intercept != nil && apistub.intercept(w, r) {
+ return
+ }
+ if r.Method == "POST" && r.URL.Path == "/arvados/v1/logs" {
+ var body struct {
+ Log struct {
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+ }
+ }
+ json.NewDecoder(r.Body).Decode(&body)
+ apistub.logs[body.Log.EventType] += body.Log.Properties.Text
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+hwPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: hwManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+otherPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: otherManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+normalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: normalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+denormalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: denormalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/containers/"+apistub.container.UUID {
+ json.NewEncoder(w).Encode(apistub.container)
+ return
+ }
+ apistub.proxy.ServeHTTP(w, r)
+}
+
func (s *TestSuite) TestLoadImage(c *C) {
s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
s.runner.Container.Mounts = map[string]arvados.Mount{
}
func (s *TestSuite) TestRunContainer(c *C) {
- s.executor.runFunc = func() {
+ s.executor.runFunc = func() int {
fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
- s.executor.exit <- 0
+ return 0
}
var logs TestLogs
cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- err = cr.UpdateContainerRunning()
+ err = cr.UpdateContainerRunning("")
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func()) (*ArvTestClient, *ContainerRunner, string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn func() int) (*ArvTestClient, *ContainerRunner, string) {
err := json.Unmarshal([]byte(record), &s.api.Container)
c.Assert(err, IsNil)
initialState := s.api.Container.State
c.Assert(err, IsNil)
c.Logf("SecretMounts decoded %v json %q", sm, secretMounts)
- s.executor.runFunc = func() {
- fn()
- s.executor.exit <- exitCode
- }
+ s.executor.runFunc = fn
s.runner.statInterval = 100 * time.Millisecond
s.runner.containerWatchdogInterval = time.Second
}
return d, err
}
+ client, _ := apiStub()
s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
- return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, nil, nil
+ return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, client, nil
}
if extraMounts != nil && len(extraMounts) > 0 {
"runtime_constraints": {"vcpus":1,"ram":1000000},
"state": "Locked",
"output_storage_classes": ["default"]
-}`, nil, 0, func() {
+}`, nil, func() int {
c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"})
c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678")
c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"})
c.Check(s.executor.created.EnableNetwork, Equals, false)
c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
fmt.Fprintln(s.executor.created.Stdout, "hello world")
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"runtime_constraints": {},
"scheduling_parameters":{"max_run_time": 1},
"state": "Running"
-}`, nil, 2, func() {
+}`, nil, func() int {
ran = true
+ return 2
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil)
c.Check(s.api.CalledWith("container.state", "Complete"), IsNil)
c.Check(ran, Equals, false)
}
+func ec2MetadataServerStub(c *C, token *string, failureRate float64, stoptime *atomic.Value) *httptest.Server {
+ failedOnce := false
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if !failedOnce || rand.Float64() < failureRate {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ failedOnce = true
+ return
+ }
+ switch r.URL.Path {
+ case "/latest/api/token":
+ fmt.Fprintln(w, *token)
+ case "/latest/meta-data/spot/instance-action":
+ if r.Header.Get("X-aws-ec2-metadata-token") != *token {
+ w.WriteHeader(http.StatusUnauthorized)
+ } else if t, _ := stoptime.Load().(time.Time); t.IsZero() {
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, t.Format(time.RFC3339))
+ }
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+}
+
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+ s.testSpotInterruptionNotice(c, 0.1)
+}
+
+func (s *TestSuite) TestSpotInterruptionNoticeNotAvailable(c *C) {
+ s.testSpotInterruptionNotice(c, 1)
+}
+
+func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
+ var stoptime atomic.Value
+ token := "fake-ec2-metadata-token"
+ stub := ec2MetadataServerStub(c, &token, failureRate, &stoptime)
+ defer stub.Close()
+
+ defer func(i time.Duration, u string) {
+ spotInterruptionCheckInterval = i
+ ec2MetadataBaseURL = u
+ }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ spotInterruptionCheckInterval = time.Second / 8
+ ec2MetadataBaseURL = stub.URL
+
+ go s.runner.checkSpotInterruptionNotices()
+ s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked"
+}`, nil, func() int {
+ time.Sleep(time.Second)
+ stoptime.Store(time.Now().Add(time.Minute).UTC())
+ token = "different-fake-ec2-metadata-token"
+ time.Sleep(time.Second)
+ return 0
+ })
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ if failureRate == 1 {
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+ } else {
+ text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+ c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
+ }
+}
+
func (s *TestSuite) TestRunTimeExceeded(c *C) {
s.fullRunHelper(c, `{
"command": ["sleep", "3"],
"runtime_constraints": {},
"scheduling_parameters":{"max_run_time": 1},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
time.Sleep(3 * time.Second)
+ return 0
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
"output_path": "/tmp",
"priority": 1,
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
s.executor.waitErr = errors.New("Container is not running")
+ return 0
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
}
func (s *TestSuite) TestCrunchstat(c *C) {
+ s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "`+arvadostest.DockerImage112PDH+`",
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
- }`, nil, 0, func() {
+ }`, nil, func() int {
time.Sleep(time.Second)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- // We didn't actually start a container, so crunchstat didn't
- // find accounting files and therefore didn't log any stats.
- // It should have logged a "can't find accounting files"
- // message after one poll interval, though, so we can confirm
- // it's alive:
c.Assert(s.api.Logs["crunchstat"], NotNil)
- c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
+ c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
- // The "files never appeared" log assures us that we called
- // (*crunchstat.Reporter)Stop(), and that we set it up with
- // the correct container ID "abcde":
- c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for cgroupid\n`)
+ // Check that we called (*crunchstat.Reporter)Stop().
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
}
func (s *TestSuite) TestNodeInfoLog(c *C) {
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
- }`, nil, 0,
- func() {
- time.Sleep(time.Second)
- })
+ }`, nil, func() int {
+ time.Sleep(time.Second)
+ return 0
+ })
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
- }`, nil, 0,
- func() {
- })
+ }`, nil, func() int {
+ return 0
+ })
c.Assert(s.api.Logs["crunch-run"], NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
}
+func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) {
+ s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`},
+ "state": "Locked"
+ }`, nil, func() int { return 0 })
+ c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String())
+ logs := s.api.Logs["crunch-run"].String()
+ pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)`
+ var threshold int
+ for _, threshold = range expected {
+ c.Check(logs, Matches, fmt.Sprintf(pattern, threshold, s.debian12MemoryCurrent, ram))
+ }
+ if notExpected > threshold {
+ c.Check(logs, Not(Matches), fmt.Sprintf(pattern, notExpected, s.debian12MemoryCurrent, ram))
+ }
+}
+
+func (s *TestSuite) TestLogNoRSSThresholds(c *C) {
+ s.testLogRSSThresholds(c, s.debian12MemoryCurrent*10, []int{}, 90)
+}
+
+func (s *TestSuite) TestLogSomeRSSThresholds(c *C) {
+ onePercentRSS := s.debian12MemoryCurrent / 100
+ s.testLogRSSThresholds(c, 102*onePercentRSS, []int{90, 95}, 99)
+}
+
+func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
+ s.testLogRSSThresholds(c, s.debian12MemoryCurrent, []int{90, 95, 99}, 0)
+}
+
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+ s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
+ s.runner.parentTemp = c.MkDir()
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`},
+ "state": "Locked"
+ }`, nil, func() int { return 0 })
+ logs := s.api.Logs["crunch-run"].String()
+ for _, expected := range []string{
+ `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+ fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent),
+ `Maximum container memory pgmajfault usage was \d+ faults`,
+ fmt.Sprintf(`Maximum container memory rss usage was 10%%, %d/%d bytes`, s.debian12MemoryCurrent, s.debian12MemoryCurrent*10),
+ `Maximum crunch-run memory rss usage was \d+ bytes`,
+ } {
+ c.Check(logs, Matches, logLineStart+expected)
+ }
+}
+
+func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
+ var collection_create, container_update arvadosclient.Dict
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301121543210"
+ }`, nil, func() int {
+ collection_create = s.api.CalledWith("ensure_unique_name", true)
+ container_update = s.api.CalledWith("container.state", "Running")
+ return 0
+ })
+
+ c.Assert(collection_create, NotNil)
+ log_collection := collection_create["collection"].(arvadosclient.Dict)
+ c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210")
+ manifest_text := log_collection["manifest_text"].(string)
+ // We check that the file size is at least two digits as an easy way to
+ // check the file isn't empty.
+ c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`)
+ c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`)
+
+ c.Assert(container_update, NotNil)
+ // As of Arvados 2.5.0, the container update must specify its log in PDH
+ // format for the API server to propagate it to container requests, which
+ // is what we care about for this test.
+ expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text))
+ c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh)
+}
+
func (s *TestSuite) TestContainerRecordLog(c *C) {
s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
- }`, nil, 0,
- func() {
+ }`, nil,
+ func() int {
time.Sleep(time.Second)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 1, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, "hello")
fmt.Fprintln(s.executor.created.Stderr, "world")
+ return 1
})
final := s.api.CalledWith("container.state", "Complete")
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"runtime_constraints": {},
"state": "Locked",
"output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"runtime_constraints": {"cuda": {"device_count": 2}},
"state": "Locked",
"output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, "ok")
+ return 0
})
c.Check(s.executor.created.CUDADeviceCount, Equals, 2)
}
"runtime_constraints": {"cuda": {"hardware_capability": "foo"}},
"state": "Locked",
"output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, "ok")
+ return 0
})
c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
}
func (s *TestSuite) TestStopOnSignal(c *C) {
- s.executor.runFunc = func() {
+ s.executor.runFunc = func() int {
s.executor.created.Stdout.Write([]byte("foo\n"))
s.runner.SigChan <- syscall.SIGINT
+ time.Sleep(10 * time.Second)
+ return 0
}
s.testStopContainer(c)
}
func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
- s.executor.runFunc = func() {
+ s.executor.runFunc = func() int {
s.executor.created.Stdout.Write([]byte("foo\n"))
s.runner.ArvMountExit <- nil
close(s.runner.ArvMountExit)
+ time.Sleep(10 * time.Second)
+ return 0
}
s.runner.ArvMountExit = make(chan error)
s.testStopContainer(c)
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env)
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
return nil, nil
}
-func stubCert(temp string) string {
+func stubCert(c *C, temp string) string {
path := temp + "/ca-certificates.crt"
- crt, _ := os.Create(path)
- crt.Close()
- arvadosclient.CertFiles = []string{path}
+ err := os.WriteFile(path, []byte{}, 0666)
+ c.Assert(err, IsNil)
+ os.Setenv("SSL_CERT_FILE", path)
return path
}
cr := s.runner
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
+ cr.containerClient, _ = apiStub()
cr.ContainerArvClient = &ArvTestClient{}
cr.ContainerKeepClient = &KeepTestClient{}
cr.Container.OutputStorageClasses = []string{"default"}
realTemp := c.MkDir()
certTemp := c.MkDir()
- stubCertPath := stubCert(certTemp)
+ stubCertPath := stubCert(c, certTemp)
cr.parentTemp = realTemp
i := 0
bindmounts, err := cr.SetupMounts()
c.Check(err, IsNil)
c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
- "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+ "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
c.Check(bindmounts, DeepEquals, map[string]bindmount{
"/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
bindmounts, err := cr.SetupMounts()
c.Check(err, IsNil)
c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
- "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+ "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
c.Check(bindmounts, DeepEquals, map[string]bindmount{
"/tmp": {realTemp + "/tmp2", false},
{
i = 0
cr.ArvMountPoint = ""
- (*GitMountSuite)(nil).useTestGitServer(c)
+ git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
cr.token = arvadostest.ActiveToken
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
"state": "Locked"
}`
- s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ s.fullRunHelper(c, helperRecord, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
}
// Used by the TestStdoutWithWrongPath*()
-func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func()) (*ArvTestClient, *ContainerRunner, error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func() int) (*ArvTestClient, *ContainerRunner, error) {
err := json.Unmarshal([]byte(record), &s.api.Container)
c.Assert(err, IsNil)
s.executor.runFunc = fn
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
"output_path": "/tmp",
"state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*")
}
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
"output_path": "/tmp",
"state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*")
}
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
"output_path": "/tmp",
"state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*")
}
"priority": 1,
"runtime_constraints": {"API": true},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST"))
- s.executor.exit <- 3
+ return 3
})
c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
"priority": 1,
"runtime_constraints": {"API": true},
"state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
s.api.Container.Output = arvadostest.DockerImage112PDH
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
ioutil.WriteFile(s.runner.ArvMountPoint+"/by_id/README", nil, 0666)
return s.runner.ArvMountCmd([]string{"bash", "-c", "echo >&2 Test: Keep write error: I am a teapot; sleep 3"}, "")
}
- s.executor.runFunc = func() {
+ s.executor.runFunc = func() int {
time.Sleep(time.Second)
- s.executor.exit <- 137
+ return 137
}
record := `{
"command": ["sleep", "1"],
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+ s.fullRunHelper(c, helperRecord, extraMounts, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {},
- "state": "Locked"
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301130848001"
}`
extraMounts := []string{
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
}
- api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+ api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- c.Check(v["ensure_unique_name"], Equals, true)
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
-
- c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+ output_count := uint(0)
+ for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+ if v["collection"] == nil {
+ continue
+ }
+ collection := v["collection"].(arvadosclient.Dict)
+ if collection["name"].(string) != "output for zzzzz-dz642-202301130848001" {
+ continue
+ }
+ c.Check(v["ensure_unique_name"], Equals, true)
+ c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
`)
- }
- }
+ output_count++
}
+ c.Check(output_count, Not(Equals), uint(0))
}
func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {},
- "state": "Locked"
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301130848002"
}`
extraMounts := []string{
"b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+ s.fullRunHelper(c, helperRecord, extraMounts, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range s.api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
-
- c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+ output_count := uint(0)
+ for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+ if v["collection"] == nil {
+ continue
+ }
+ collection := v["collection"].(arvadosclient.Dict)
+ if collection["name"].(string) != "output for zzzzz-dz642-202301130848002" {
+ continue
+ }
+ c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 10:17:bar
`)
- }
- }
+ output_count++
}
+ c.Check(output_count, Not(Equals), uint(0))
}
func (s *TestSuite) TestOutputError(c *C) {
"runtime_constraints": {},
"state": "Locked"
}`
- s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ s.fullRunHelper(c, helperRecord, nil, func() int {
os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz")
+ return 0
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
"b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
}
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"state": "Locked"
}`
- api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ api, _, _ := s.fullRunHelper(c, helperRecord, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+ return 0
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 1, func() {
+}`, nil, func() int {
fmt.Fprintln(s.executor.created.Stdout, "hello")
fmt.Fprintln(s.executor.created.Stderr, "oops")
+ return 1
})
final := api.CalledWith("container.state", "Complete")
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 0, func() {})
+}`, nil, func() int { return 0 })
c.Check(s.api.CalledWith("container.state", nextState), NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
if s.runner.brokenNodeHook != "" {
"priority": 1,
"runtime_constraints": {},
"state": "Locked"
-}`, nil, 0, func() {})
+}`, nil, func() int { return 0 })
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
}
"state": "Locked"
}`
- s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ s.fullRunHelper(c, helperRecord, nil, func() int {
content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
c.Check(err, IsNil)
c.Check(string(content), Equals, "mypassword")
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
}`
s.SetUpTest(c)
- s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ s.fullRunHelper(c, helperRecord, nil, func() int {
content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
c.Check(err, IsNil)
c.Check(string(content), Equals, "mypassword")
+ return 0
})
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
}`
s.SetUpTest(c)
- _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, func() int {
// secret.conf should be provisioned as a separate
// bind mount, i.e., it should not appear in the
// (fake) fuse filesystem as viewed from the host.
}
err = ioutil.WriteFile(s.runner.HostOutputDir+"/.arvados#collection", []byte(`{"manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"}`), 0700)
c.Check(err, IsNil)
+ return 0
})
content, err := ioutil.ReadFile(realtemp + "/text1/mountdata.text")
c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
}
+func (s *TestSuite) TestCalculateCost(c *C) {
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+ now := time.Now()
+ cr := s.runner
+ cr.costStartTime = now.Add(-time.Hour)
+ var logbuf bytes.Buffer
+ cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+
+ // if there's no InstanceType env var, cost is calculated as 0
+ os.Unsetenv("InstanceType")
+ cost := cr.calculateCost(now)
+ c.Check(cost, Equals, 0.0)
+
+ // with InstanceType env var and loadPrices() hasn't run (or
+ // hasn't found any data), cost is calculated based on
+ // InstanceType env var
+ os.Setenv("InstanceType", `{"Price":1.2}`)
+ defer os.Unsetenv("InstanceType")
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.2)
+
+ // first update tells us the spot price was $1/h until 30
+ // minutes ago when it increased to $2/h
+ j, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+ {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.5)
+
+ // next update (via --list + SIGUSR2) tells us the spot price
+ // increased to $3/h 15 minutes ago
+ j, err = json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
+ {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+ cost = cr.calculateCost(now.Add(-time.Hour / 2))
+ c.Check(cost, Equals, 0.5)
+
+ c.Logf("%s", logbuf.String())
+ c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+ c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
+}
+
+func (s *TestSuite) TestSIGUSR2CostUpdate(c *C) {
+ pid := os.Getpid()
+ now := time.Now()
+ pricesJSON, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 2.4},
+ {StartTime: now.Add(-2 * time.Hour), Price: 2.6},
+ })
+ c.Assert(err, IsNil)
+
+ os.Setenv("InstanceType", `{"Price":2.2}`)
+ defer os.Unsetenv("InstanceType")
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+
+ // We can't use s.api.CalledWith because timing differences will yield
+ // different cost values across runs. getCostUpdate iterates over API
+ // calls until it finds one that sets the cost, then writes that value
+ // to the next index of costUpdates.
+ deadline := now.Add(time.Second)
+ costUpdates := make([]float64, 2)
+ costIndex := 0
+ apiIndex := 0
+ getCostUpdate := func() {
+ for ; time.Now().Before(deadline); time.Sleep(time.Second / 10) {
+ for apiIndex < len(s.api.Content) {
+ update := s.api.Content[apiIndex]
+ apiIndex++
+ var ok bool
+ var cost float64
+ if update, ok = update["container"].(arvadosclient.Dict); !ok {
+ continue
+ }
+ if cost, ok = update["cost"].(float64); !ok {
+ continue
+ }
+ c.Logf("API call #%d updates cost to %v", apiIndex-1, cost)
+ costUpdates[costIndex] = cost
+ costIndex++
+ return
+ }
+ }
+ }
+
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-20230320101530a"
+ }`, nil, func() int {
+ s.runner.costStartTime = now.Add(-3 * time.Hour)
+ err := syscall.Kill(pid, syscall.SIGUSR2)
+ c.Check(err, IsNil, Commentf("error sending first SIGUSR2 to runner"))
+ getCostUpdate()
+
+ err = os.WriteFile(path.Join(lockdir, pricesfile), pricesJSON, 0o700)
+ c.Check(err, IsNil, Commentf("error writing JSON prices file"))
+ err = syscall.Kill(pid, syscall.SIGUSR2)
+ c.Check(err, IsNil, Commentf("error sending second SIGUSR2 to runner"))
+ getCostUpdate()
+
+ return 0
+ })
+ // Comparing with format strings makes it easy to ignore minor variations
+ // in cost across runs while keeping diagnostics pretty.
+ c.Check(fmt.Sprintf("%.3f", costUpdates[0]), Equals, "6.600")
+ c.Check(fmt.Sprintf("%.3f", costUpdates[1]), Equals, "7.600")
+}
+
type FakeProcess struct {
cmdLine []string
}