X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b3e8a483835334becbef9cba2bebbcf08df47c15..3826a6339ba1c901c054053920ed20547b3ba54d:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index a9f1c25d37..653e0b4949 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -152,16 +152,18 @@ func (runner *ContainerRunner) setupSignals() { go func(sig chan os.Signal) { for s := range sig { - runner.CrunchLog.Printf("caught signal: %v", s) - runner.stop() + runner.stop(s) } }(runner.SigChan) } // stop the underlying Docker container. -func (runner *ContainerRunner) stop() { +func (runner *ContainerRunner) stop(sig os.Signal) { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() + if sig != nil { + runner.CrunchLog.Printf("caught signal: %v", sig) + } if runner.ContainerID == "" { return } @@ -173,12 +175,6 @@ func (runner *ContainerRunner) stop() { } } -func (runner *ContainerRunner) stopSignals() { - if runner.SigChan != nil { - signal.Stop(runner.SigChan) - } -} - var errorBlacklist = []string{ "(?ms).*[Cc]annot connect to the Docker daemon.*", "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", @@ -891,7 +887,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { _, err := io.Copy(response.Conn, stdinRdr) if err != nil { runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) - runner.stop() + runner.stop(nil) } stdinRdr.Close() response.CloseWrite() @@ -901,7 +897,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) if err != nil { runner.CrunchLog.Print("While writing stdin json to docker container %q", err) - runner.stop() + runner.stop(nil) } response.CloseWrite() }() @@ -952,6 +948,7 @@ func (runner *ContainerRunner) CreateContainer() error { runner.ContainerConfig.Volumes = runner.Volumes + maxRAM := int64(runner.Container.RuntimeConstraints.RAM) runner.HostConfig = dockercontainer.HostConfig{ Binds: runner.Binds, LogConfig: dockercontainer.LogConfig{ @@ -959,6 +956,10 @@ func (runner *ContainerRunner) CreateContainer() error { }, Resources: dockercontainer.Resources{ CgroupParent: runner.setCgroupParent, + NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000, + Memory: maxRAM, // RAM + MemorySwap: maxRAM, // RAM+swap + KernelMemory: maxRAM, // kernel portion }, } @@ -1041,7 +1042,7 @@ func (runner *ContainerRunner) WaitFinish() error { case <-arvMountExit: runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") - runner.stop() + runner.stop(nil) // arvMountExit will always be ready now that // it's closed, but that doesn't interest us. arvMountExit = nil @@ -1443,20 +1444,26 @@ func (runner *ContainerRunner) CleanupDirs() { // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { - runner.CrunchLog.Print(runner.finalState) + func() { + // Hold cStateLock to prevent races on CrunchLog (e.g., stop()). + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() - if runner.arvMountLog != nil { - runner.arvMountLog.Close() - } - runner.CrunchLog.Close() + runner.CrunchLog.Print(runner.finalState) - // Closing CrunchLog above allows them to be committed to Keep at this - // point, but re-open crunch log with ArvClient in case there are any - // other further errors (such as failing to write the log to Keep!) - // while shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, - UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) - runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + if runner.arvMountLog != nil { + runner.arvMountLog.Close() + } + runner.CrunchLog.Close() + + // Closing CrunchLog above allows them to be committed to Keep at this + // point, but re-open crunch log with ArvClient in case there are any + // other further errors (such as failing to write the log to Keep!) + // while shutting down + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) + runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + }() if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1565,7 +1572,6 @@ func (runner *ContainerRunner) Run() (err error) { runner.finalState = "Queued" defer func() { - runner.stopSignals() runner.CleanupDirs() runner.CrunchLog.Printf("crunch-run finished")