From: Tom Clegg Date: Fri, 10 Jun 2016 03:04:50 +0000 (-0400) Subject: 9374: Consolidate various Container structs as arvados.Container. X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/30cb134722a592044a443a2eeb268e3cc432a122?ds=inline 9374: Consolidate various Container structs as arvados.Container. --- diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go new file mode 100644 index 0000000000..ac129526fd --- /dev/null +++ b/sdk/go/arvados/container.go @@ -0,0 +1,54 @@ +package arvados + +// Container is an arvados#container resource. +type Container struct { + UUID string `json:"uuid"` + Command []string `json:"command"` + ContainerImage string `json:"container_image"` + Cwd string `json:"cwd"` + Environment map[string]string `json:"environment"` + LockedByUUID string `json:"locked_by_uuid"` + Mounts map[string]Mount `json:"mounts"` + Output string `json:"output"` + OutputPath string `json:"output_path"` + Priority int `json:"priority"` + RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"` + State ContainerState `json:"state"` +} + +// Mount is special behavior to attach to a filesystem path or device. +type Mount struct { + Kind string `json:"kind"` + Writable bool `json:"writable"` + PortableDataHash string `json:"portable_data_hash"` + UUID string `json:"uuid"` + DeviceType string `json:"device_type"` + Path string `json:"path"` +} + +// RuntimeConstraints specify a container's compute resources (RAM, +// CPU) and network connectivity. +type RuntimeConstraints struct { + API *bool + RAM int `json:"ram"` + VCPUs int `json:"vcpus"` +} + +// ContainerList is an arvados#containerList resource. +type ContainerList struct { + Items []Container `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +// ContainerState is a string corresponding to a valid Container state. +type ContainerState string + +const ( + ContainerStateQueued = ContainerState("Queued") + ContainerStateLocked = ContainerState("Locked") + ContainerStateRunning = ContainerState("Running") + ContainerStateComplete = ContainerState("Complete") + ContainerStateCancelled = ContainerState("Cancelled") +) diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 54d596fee8..4b66c23b7c 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -4,6 +4,7 @@ package dispatch import ( + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" "os" @@ -13,13 +14,12 @@ import ( "time" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) type apiClientAuthorization struct { @@ -31,21 +31,6 @@ type apiClientAuthorizationList struct { Items []apiClientAuthorization `json:"items"` } -// Represents an Arvados container record -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} - -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} - // Dispatcher holds the state of the dispatcher type Dispatcher struct { // The Arvados client @@ -63,7 +48,7 @@ type Dispatcher struct { // handled by this dispatcher and the goroutine should terminate. The // goroutine is responsible for draining the 'status' channel, failure // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, Container, chan Container) + RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) // Amount of time to wait between polling for updates. PollInterval time.Duration @@ -72,22 +57,22 @@ type Dispatcher struct { DoneProcessing chan struct{} mineMutex sync.Mutex - mineMap map[string]chan Container + mineMap map[string]chan arvados.Container Auth apiClientAuthorization - containers chan Container + containers chan arvados.Container } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones // for which this process is actively starting/monitoring. Returns channel to // be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { +func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() if ch, ok := dispatcher.mineMap[uuid]; ok { return ch } - ch := make(chan Container) + ch := make(chan arvados.Container) dispatcher.mineMap[uuid] = ch return ch } @@ -102,10 +87,10 @@ func (dispatcher *Dispatcher) notMine(uuid string) { } } -// checkMine returns true/false if there is a channel for updates associated +// checkMine returns true if there is a channel for updates associated // with container c. If update is true, also send the container record on // the channel. -func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool { +func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() ch, ok := dispatcher.mineMap[c.UUID] @@ -119,7 +104,7 @@ func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool { } func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList + var containers arvados.ContainerList err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -175,7 +160,7 @@ func (dispatcher *Dispatcher) pollContainers() { } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { +func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { if container.State == Queued && dispatcher.checkMine(container, false) { // If we previously started the job, something failed, and it // was re-queued, this dispatcher might still be monitoring it. @@ -216,7 +201,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { +func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { err := dispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": newState}}, @@ -237,8 +222,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) { return } - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) + dispatcher.mineMap = make(map[string]chan arvados.Container) + dispatcher.containers = make(chan arvados.Container) // Graceful shutdown on signal sigChan := make(chan os.Signal) diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index 73a3895336..936a9088ed 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -4,6 +4,7 @@ package main import ( "flag" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/dispatch" "log" @@ -76,7 +77,7 @@ func doMain() error { return nil } -func startFunc(container dispatch.Container, cmd *exec.Cmd) error { +func startFunc(container arvados.Container, cmd *exec.Cmd) error { return cmd.Start() } @@ -91,8 +92,8 @@ var startCmd = startFunc // If the container is in any other state, or is not Complete/Cancelled after // crunch-run terminates, mark the container as Cancelled. func run(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { uuid := container.UUID diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go index d4a2708a5d..9628bf2f0a 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" "git.curoverse.com/arvados.git/sdk/go/dispatch" @@ -64,16 +65,16 @@ func (s *TestSuite) TestIntegration(c *C) { doneProcessing := make(chan struct{}) dispatcher := dispatch.Dispatcher{ Arv: arv, - PollInterval: time.Duration(1) * time.Second, + PollInterval: time.Second, RunContainer: func(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { run(dispatcher, container, status) doneProcessing <- struct{}{} }, DoneProcessing: doneProcessing} - startCmd = func(container dispatch.Container, cmd *exec.Cmd) error { + startCmd = func(container arvados.Container, cmd *exec.Cmd) error { dispatcher.UpdateState(container.UUID, "Running") dispatcher.UpdateState(container.UUID, "Complete") return cmd.Start() @@ -89,16 +90,16 @@ func (s *TestSuite) TestIntegration(c *C) { params := arvadosclient.Dict{ "filters": [][]string{[]string{"state", "=", "Queued"}}, } - var containers dispatch.ContainerList + var containers arvados.ContainerList err = arv.List("containers", params, &containers) c.Check(err, IsNil) c.Assert(len(containers.Items), Equals, 0) // Previously "Queued" container should now be in "Complete" state - var container dispatch.Container + var container arvados.Container err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container) c.Check(err, IsNil) - c.Check(container.State, Equals, "Complete") + c.Check(string(container.State), Equals, "Complete") } func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) { @@ -168,14 +169,14 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon Arv: arv, PollInterval: time.Duration(1) * time.Second, RunContainer: func(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { run(dispatcher, container, status) doneProcessing <- struct{}{} }, DoneProcessing: doneProcessing} - startCmd = func(container dispatch.Container, cmd *exec.Cmd) error { + startCmd = func(container arvados.Container, cmd *exec.Cmd) error { dispatcher.UpdateState(container.UUID, "Running") dispatcher.UpdateState(container.UUID, "Complete") return cmd.Start() diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index f718fbcdce..4bfff6a5f0 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -5,6 +5,7 @@ package main import ( "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/dispatch" "io/ioutil" @@ -69,17 +70,17 @@ func doMain() error { } // sbatchCmd -func sbatchFunc(container dispatch.Container) *exec.Cmd { - memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576))) +func sbatchFunc(container arvados.Container) *exec.Cmd { + memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576)) return exec.Command("sbatch", "--share", "--parsable", fmt.Sprintf("--job-name=%s", container.UUID), fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)), - fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])), + fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs), fmt.Sprintf("--priority=%d", container.Priority)) } // scancelCmd -func scancelFunc(container dispatch.Container) *exec.Cmd { +func scancelFunc(container arvados.Container) *exec.Cmd { return exec.Command("scancel", "--name="+container.UUID) } @@ -89,7 +90,7 @@ var scancelCmd = scancelFunc // Submit job to slurm using sbatch. func submit(dispatcher *dispatch.Dispatcher, - container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) { + container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) { submitErr = nil defer func() { @@ -181,7 +182,7 @@ func submit(dispatcher *dispatch.Dispatcher, // // If the container is marked as Running, check if it is in the slurm queue. // If not, mark it as Cancelled. -func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) { +func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) { submitted := false for !*monitorDone { if squeueUpdater.CheckSqueue(container.UUID) { @@ -207,13 +208,13 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C // release it back to the Queue, if it is Running then // clean up the record. - var con dispatch.Container + var con arvados.Container err := dispatcher.Arv.Get("containers", container.UUID, nil, &con) if err != nil { log.Printf("Error getting final container state: %v", err) } - var st string + var st arvados.ContainerState switch con.State { case dispatch.Locked: st = dispatch.Queued @@ -236,8 +237,8 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C // Monitor status updates. If the priority changes to zero, cancel the // container using scancel. func run(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { log.Printf("Monitoring container %v started", container.UUID) defer log.Printf("Monitoring container %v finished", container.UUID) diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index cddbe8c706..b72ad9fa9d 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -3,6 +3,7 @@ package main import ( "bytes" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" "git.curoverse.com/arvados.git/sdk/go/dispatch" @@ -59,29 +60,29 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) { func (s *TestSuite) TestIntegrationNormal(c *C) { container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") }, []string(nil), - func(dispatcher *dispatch.Dispatcher, container dispatch.Container) { + func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(3 * time.Second) dispatcher.UpdateState(container.UUID, dispatch.Complete) }) - c.Check(container.State, Equals, "Complete") + c.Check(container.State, Equals, arvados.ContainerStateComplete) } func (s *TestSuite) TestIntegrationCancel(c *C) { // Override sbatchCmd var scancelCmdLine []string - defer func(orig func(dispatch.Container) *exec.Cmd) { + defer func(orig func(arvados.Container) *exec.Cmd) { scancelCmd = orig }(scancelCmd) - scancelCmd = func(container dispatch.Container) *exec.Cmd { + scancelCmd = func(container arvados.Container) *exec.Cmd { scancelCmdLine = scancelFunc(container).Args return exec.Command("echo") } container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") }, []string(nil), - func(dispatcher *dispatch.Dispatcher, container dispatch.Container) { + func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(1 * time.Second) dispatcher.Arv.Update("containers", container.UUID, @@ -89,7 +90,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) { "container": arvadosclient.Dict{"priority": 0}}, nil) }) - c.Check(container.State, Equals, "Cancelled") + c.Check(container.State, Equals, arvados.ContainerStateCancelled) c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"}) } @@ -99,18 +100,18 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) { fmt.Sprintf("--mem-per-cpu=%d", 2862), fmt.Sprintf("--cpus-per-task=%d", 4), fmt.Sprintf("--priority=%d", 1)}, - func(dispatcher *dispatch.Dispatcher, container dispatch.Container) { + func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(3 * time.Second) dispatcher.UpdateState(container.UUID, dispatch.Complete) }) - c.Check(container.State, Equals, "Cancelled") + c.Check(container.State, Equals, arvados.ContainerStateCancelled) } func (s *TestSuite) integrationTest(c *C, newSqueueCmd func() *exec.Cmd, sbatchCmdComps []string, - runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container { + runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container { arvadostest.ResetEnv() arv, err := arvadosclient.MakeArvadosClient() @@ -119,10 +120,10 @@ func (s *TestSuite) integrationTest(c *C, var sbatchCmdLine []string // Override sbatchCmd - defer func(orig func(dispatch.Container) *exec.Cmd) { + defer func(orig func(arvados.Container) *exec.Cmd) { sbatchCmd = orig }(sbatchCmd) - sbatchCmd = func(container dispatch.Container) *exec.Cmd { + sbatchCmd = func(container arvados.Container) *exec.Cmd { sbatchCmdLine = sbatchFunc(container).Args return exec.Command("sh") } @@ -137,7 +138,7 @@ func (s *TestSuite) integrationTest(c *C, params := arvadosclient.Dict{ "filters": [][]string{[]string{"state", "=", "Queued"}}, } - var containers dispatch.ContainerList + var containers arvados.ContainerList err = arv.List("containers", params, &containers) c.Check(err, IsNil) c.Check(len(containers.Items), Equals, 1) @@ -150,8 +151,8 @@ func (s *TestSuite) integrationTest(c *C, Arv: arv, PollInterval: time.Duration(1) * time.Second, RunContainer: func(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { go runContainer(dispatcher, container) run(dispatcher, container, status) doneProcessing <- struct{}{} @@ -173,7 +174,7 @@ func (s *TestSuite) integrationTest(c *C, c.Check(len(containers.Items), Equals, 0) // Previously "Queued" container should now be in "Complete" state - var container dispatch.Container + var container arvados.Container err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container) c.Check(err, IsNil) return container @@ -212,8 +213,8 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon Arv: arv, PollInterval: time.Duration(1) * time.Second, RunContainer: func(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { go func() { time.Sleep(1 * time.Second) dispatcher.UpdateState(container.UUID, dispatch.Running) diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index f55834566d..758f41eb3e 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" @@ -39,41 +40,12 @@ type IKeepClient interface { ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) } -// Mount describes the mount points to create inside the container. -type Mount struct { - Kind string `json:"kind"` - Writable bool `json:"writable"` - PortableDataHash string `json:"portable_data_hash"` - UUID string `json:"uuid"` - DeviceType string `json:"device_type"` - Path string `json:"path"` -} - // Collection record returned by the API server. type CollectionRecord struct { ManifestText string `json:"manifest_text"` PortableDataHash string `json:"portable_data_hash"` } -type RuntimeConstraints struct { - API *bool -} - -// ContainerRecord is the container record returned by the API server. -type ContainerRecord struct { - UUID string `json:"uuid"` - Command []string `json:"command"` - ContainerImage string `json:"container_image"` - Cwd string `json:"cwd"` - Environment map[string]string `json:"environment"` - Mounts map[string]Mount `json:"mounts"` - OutputPath string `json:"output_path"` - Priority int `json:"priority"` - RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"` - State string `json:"state"` - Output string `json:"output"` -} - // APIClientAuthorization is an arvados#api_client_authorization resource. type APIClientAuthorization struct { UUID string `json:"uuid"` @@ -105,7 +77,7 @@ type ContainerRunner struct { Docker ThinDockerClient ArvClient IArvadosClient Kc IKeepClient - ContainerRecord + arvados.Container dockerclient.ContainerConfig dockerclient.HostConfig token string @@ -160,10 +132,10 @@ func (runner *ContainerRunner) SetupSignals() { // the image from Keep. func (runner *ContainerRunner) LoadImage() (err error) { - runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage) + runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage) var collection CollectionRecord - err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection) + err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection) if err != nil { return fmt.Errorf("While getting container image collection: %v", err) } @@ -271,7 +243,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { collectionPaths := []string{} runner.Binds = nil - for bind, mnt := range runner.ContainerRecord.Mounts { + for bind, mnt := range runner.Container.Mounts { if bind == "stdout" { // Is it a "file" mount kind? if mnt.Kind != "file" { @@ -279,7 +251,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } // Does path start with OutputPath? - prefix := runner.ContainerRecord.OutputPath + prefix := runner.Container.OutputPath if !strings.HasSuffix(prefix, "/") { prefix += "/" } @@ -311,7 +283,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { tmpcount += 1 } if mnt.Writable { - if bind == runner.ContainerRecord.OutputPath { + if bind == runner.Container.OutputPath { runner.HostOutputDir = src } runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) @@ -320,7 +292,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } collectionPaths = append(collectionPaths, src) } else if mnt.Kind == "tmp" { - if bind == runner.ContainerRecord.OutputPath { + if bind == runner.Container.OutputPath { runner.HostOutputDir, err = runner.MkTempDir("", "") if err != nil { return fmt.Errorf("While creating mount temp dir: %v", err) @@ -428,8 +400,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.loggingDone = make(chan bool) - if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok { - stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):] + if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok { + stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):] index := strings.LastIndex(stdoutPath, "/") if index > 0 { subdirs := stdoutPath[:index] @@ -464,15 +436,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) { func (runner *ContainerRunner) CreateContainer() error { runner.CrunchLog.Print("Creating Docker container") - runner.ContainerConfig.Cmd = runner.ContainerRecord.Command - if runner.ContainerRecord.Cwd != "." { - runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd + runner.ContainerConfig.Cmd = runner.Container.Command + if runner.Container.Cwd != "." { + runner.ContainerConfig.WorkingDir = runner.Container.Cwd } - for k, v := range runner.ContainerRecord.Environment { + for k, v := range runner.Container.Environment { runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) } - if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI { + if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { tok, err := runner.ContainerToken() if err != nil { return err @@ -615,7 +587,7 @@ func (runner *ContainerRunner) CommitLogs() error { // point, but re-open crunch log with ArvClient in case there are any // other further (such as failing to write the log to Keep!) while // shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID, "crunch-run", nil}) if runner.LogsPDH != nil { @@ -636,7 +608,7 @@ func (runner *ContainerRunner) CommitLogs() error { err = runner.ArvClient.Create("collections", arvadosclient.Dict{ "collection": arvadosclient.Dict{ - "name": "logs for " + runner.ContainerRecord.UUID, + "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) if err != nil { @@ -648,14 +620,14 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } -// UpdateContainerRecordRunning updates the container state to "Running" -func (runner *ContainerRunner) UpdateContainerRecordRunning() error { +// UpdateContainerRunning updates the container state to "Running" +func (runner *ContainerRunner) UpdateContainerRunning() error { runner.CancelLock.Lock() defer runner.CancelLock.Unlock() if runner.Cancelled { return ErrCancelled } - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, + return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } @@ -667,7 +639,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) { } var auth APIClientAuthorization - err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth) + err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth) if err != nil { return "", err } @@ -675,9 +647,9 @@ func (runner *ContainerRunner) ContainerToken() (string, error) { return runner.token, nil } -// UpdateContainerRecordComplete updates the container record state on API +// UpdateContainerComplete updates the container record state on API // server to "Complete" or "Cancelled" -func (runner *ContainerRunner) UpdateContainerRecordFinal() error { +func (runner *ContainerRunner) UpdateContainerFinal() error { update := arvadosclient.Dict{} update["state"] = runner.finalState if runner.finalState == "Complete" { @@ -691,7 +663,7 @@ func (runner *ContainerRunner) UpdateContainerRecordFinal() error { update["output"] = *runner.OutputPDH } } - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil) + return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -703,12 +675,12 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { - runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID) + runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) hostname, hosterr := os.Hostname() if hosterr != nil { @@ -743,7 +715,7 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(err) if runner.finalState == "Queued" { - runner.UpdateContainerRecordFinal() + runner.UpdateContainerFinal() return } @@ -755,7 +727,7 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(runner.CaptureOutput()) checkErr(runner.CommitLogs()) - checkErr(runner.UpdateContainerRecordFinal()) + checkErr(runner.UpdateContainerFinal()) // The real log is already closed, but then we opened // a new one in case we needed to log anything while @@ -763,7 +735,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() - err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord) + err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container) if err != nil { err = fmt.Errorf("While getting container record: %v", err) return @@ -795,7 +767,7 @@ func (runner *ContainerRunner) Run() (err error) { return } - err = runner.UpdateContainerRecordRunning() + err = runner.UpdateContainerRunning() if err != nil { return } @@ -824,7 +796,7 @@ func NewContainerRunner(api IArvadosClient, cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} - cr.ContainerRecord.UUID = containerUUID + cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) return cr diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index 998c4bc17a..242e207fc9 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" @@ -38,7 +39,7 @@ type ArvTestClient struct { Total int64 Calls int Content []arvadosclient.Dict - ContainerRecord + arvados.Container Logs map[string]*bytes.Buffer WasSetRunning bool sync.Mutex @@ -183,7 +184,7 @@ func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arva } } if resourceType == "containers" { - (*output.(*ContainerRecord)) = this.ContainerRecord + (*output.(*arvados.Container)) = this.Container } return nil } @@ -206,7 +207,8 @@ func (this *ArvTestClient) Update(resourceType string, uuid string, parameters a // "baz") returns parameters with parameters["foo"]["bar"]=="baz". If // no call matches, it returns nil. func (this *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict { - call: for _, content := range this.Content { +call: + for _, content := range this.Content { var v interface{} = content for _, k := range strings.Split(jpath, ".") { if dict, ok := v.(arvadosclient.Dict); !ok { @@ -255,7 +257,7 @@ func (s *TestSuite) TestLoadImage(c *C) { _, err = cr.Docker.InspectImage(hwImageId) c.Check(err, NotNil) - cr.ContainerRecord.ContainerImage = hwPDH + cr.Container.ContainerImage = hwPDH // (1) Test loading image from keep c.Check(kc.Called, Equals, false) @@ -340,7 +342,7 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file func (s *TestSuite) TestLoadImageArvError(c *C) { // (1) Arvados error cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - cr.ContainerRecord.ContainerImage = hwPDH + cr.Container.ContainerImage = hwPDH err := cr.LoadImage() c.Check(err.Error(), Equals, "While getting container image collection: ArvError") @@ -350,7 +352,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) { // (2) Keep error docker := NewTestDockerClient() cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - cr.ContainerRecord.ContainerImage = hwPDH + cr.Container.ContainerImage = hwPDH err := cr.LoadImage() c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError") @@ -359,7 +361,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) { func (s *TestSuite) TestLoadImageCollectionError(c *C) { // (3) Collection doesn't contain image cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - cr.ContainerRecord.ContainerImage = otherPDH + cr.Container.ContainerImage = otherPDH err := cr.LoadImage() c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar") @@ -369,7 +371,7 @@ func (s *TestSuite) TestLoadImageKeepReadError(c *C) { // (4) Collection doesn't contain image docker := NewTestDockerClient() cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - cr.ContainerRecord.ContainerImage = hwPDH + cr.Container.ContainerImage = hwPDH err := cr.LoadImage() c.Check(err, NotNil) @@ -418,8 +420,8 @@ func (s *TestSuite) TestRunContainer(c *C) { var logs TestLogs cr.NewLogWriter = logs.NewTestLoggingWriter - cr.ContainerRecord.ContainerImage = hwPDH - cr.ContainerRecord.Command = []string{"./hw"} + cr.Container.ContainerImage = hwPDH + cr.Container.Command = []string{"./hw"} err := cr.LoadImage() c.Check(err, IsNil) @@ -455,18 +457,18 @@ func (s *TestSuite) TestCommitLogs(c *C) { c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60") } -func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) { +func (s *TestSuite) TestUpdateContainerRunning(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - err := cr.UpdateContainerRecordRunning() + err := cr.UpdateContainerRunning() c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running") } -func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) { +func (s *TestSuite) TestUpdateContainerComplete(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") @@ -478,7 +480,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) { *cr.ExitCode = 42 cr.finalState = "Complete" - err := cr.UpdateContainerRecordFinal() + err := cr.UpdateContainerFinal() c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH) @@ -486,14 +488,14 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) { c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Complete") } -func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) { +func (s *TestSuite) TestUpdateContainerCancelled(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") cr.Cancelled = true cr.finalState = "Cancelled" - err := cr.UpdateContainerRecordFinal() + err := cr.UpdateContainerFinal() c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil) @@ -504,7 +506,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) { // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full // dress rehearsal of the Run() function, starting from a JSON container record. func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) { - rec := ContainerRecord{} + rec := arvados.Container{} err := json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) @@ -512,7 +514,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT docker.fn = fn docker.RemoveImage(hwImageId, true) - api = &ArvTestClient{ContainerRecord: rec} + api = &ArvTestClient{Container: rec} cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest @@ -643,7 +645,7 @@ func (s *TestSuite) TestCancel(c *C) { "runtime_constraints": {} }` - rec := ContainerRecord{} + rec := arvados.Container{} err := json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) @@ -656,7 +658,7 @@ func (s *TestSuite) TestCancel(c *C) { } docker.RemoveImage(hwImageId, true) - api := &ArvTestClient{ContainerRecord: rec} + api := &ArvTestClient{Container: rec} cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest @@ -735,8 +737,8 @@ func (s *TestSuite) TestSetupMounts(c *C) { } { - cr.ContainerRecord.Mounts = make(map[string]Mount) - cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"} + cr.Container.Mounts = make(map[string]arvados.Mount) + cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} cr.OutputPath = "/tmp" err := cr.SetupMounts() @@ -748,8 +750,8 @@ func (s *TestSuite) TestSetupMounts(c *C) { { i = 0 - cr.ContainerRecord.Mounts = make(map[string]Mount) - cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true} + cr.Container.Mounts = make(map[string]arvados.Mount) + cr.Container.Mounts["/keeptmp"] = arvados.Mount{Kind: "collection", Writable: true} cr.OutputPath = "/keeptmp" os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm) @@ -763,9 +765,9 @@ func (s *TestSuite) TestSetupMounts(c *C) { { i = 0 - cr.ContainerRecord.Mounts = make(map[string]Mount) - cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"} - cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true} + cr.Container.Mounts = make(map[string]arvados.Mount) + cr.Container.Mounts["/keepinp"] = arvados.Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"} + cr.Container.Mounts["/keepout"] = arvados.Mount{Kind: "collection", Writable: true} cr.OutputPath = "/keepout" os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) @@ -808,7 +810,7 @@ func (s *TestSuite) TestStdout(c *C) { // Used by the TestStdoutWithWrongPath*() func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) { - rec := ContainerRecord{} + rec := arvados.Container{} err = json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) @@ -816,7 +818,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap docker.fn = fn docker.RemoveImage(hwImageId, true) - api = &ArvTestClient{ContainerRecord: rec} + api = &ArvTestClient{Container: rec} cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest