X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/77c8223f5ddd64cff2b08d0857749644c474946f..0a27815bdf3f1d1bc1eb3771bcee9294b6f4136f:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index f6094e0e92..8c989d533b 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -95,7 +95,7 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - Docker ThinDockerClient + ContainerExecRunner ThinContainerExecRunner // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -119,35 +119,33 @@ type ContainerRunner struct { ContainerArvClient IArvadosClient ContainerKeepClient IKeepClient - Container arvados.Container - ContainerConfig dockercontainer.Config - HostConfig dockercontainer.HostConfig - token string - ContainerID string - ExitCode *int - NewLogWriter NewLogWriter - loggingDone chan bool - CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser - logUUID string - logMtx sync.Mutex - LogCollection arvados.CollectionFileSystem - LogsPDH *string - RunArvMount RunArvMount - MkTempDir MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) - finalState string - parentTemp string + Container arvados.Container + token string + ContainerID string + ExitCode *int + NewLogWriter NewLogWriter + loggingDone chan bool + CrunchLog *ThrottledLogger + Stdout io.WriteCloser + Stderr io.WriteCloser + logUUID string + logMtx sync.Mutex + LogCollection arvados.CollectionFileSystem + LogsPDH *string + RunArvMount RunArvMount + MkTempDir MkTempDir + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Binds []string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + SecretMounts map[string]arvados.Mount + MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) + finalState string + parentTemp string statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -209,7 +207,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) { } runner.cCancelled = true runner.CrunchLog.Printf("removing container") - err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true}) + err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true}) if err != nil { runner.CrunchLog.Printf("error removing container: %s", err) } @@ -283,7 +281,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Printf("Using Docker image id '%s'", imageID) - _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID) + _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID) if err != nil { runner.CrunchLog.Print("Loading Docker image from keep") @@ -293,7 +291,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) } - response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true) + response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true) if err != nil { return fmt.Errorf("While loading container image into Docker: %v", err) } @@ -308,7 +306,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Print("Docker image is available") } - runner.ContainerConfig.Image = imageID + runner.ContainerExecRunner.SetImage(imageID) runner.ContainerKeepClient.ClearBlockCache() @@ -976,8 +974,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) { } stdinUsed := stdinRdr != nil || len(stdinJSON) != 0 - response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) + response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID, + ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) } @@ -1063,16 +1061,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { func (runner *ContainerRunner) CreateContainer() error { runner.CrunchLog.Print("Creating Docker container") - runner.ContainerConfig.Cmd = runner.Container.Command + containerConfig, err := runner.ContainerExecRunner.GetContainerConfig() + hostConfig, err := runner.ContainerExecRunner.GetHostConfig() + + containerConfig.Cmd = runner.Container.Command if runner.Container.Cwd != "." { - runner.ContainerConfig.WorkingDir = runner.Container.Cwd + containerConfig.WorkingDir = runner.Container.Cwd } for k, v := range runner.Container.Environment { - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) + containerConfig.Env = append(containerConfig.Env, k+"="+v) } - runner.ContainerConfig.Volumes = runner.Volumes + containerConfig.Volumes = runner.Volumes maxRAM := int64(runner.Container.RuntimeConstraints.RAM) minDockerRAM := int64(16) @@ -1080,12 +1081,12 @@ func (runner *ContainerRunner) CreateContainer() error { // Docker daemon won't let you set a limit less than ~10 MiB maxRAM = minDockerRAM * 1024 * 1024 } - runner.HostConfig = dockercontainer.HostConfig{ + hostConfig = HostConfig{ Binds: runner.Binds, - LogConfig: dockercontainer.LogConfig{ + LogConfig: LogConfig{ Type: "none", }, - Resources: dockercontainer.Resources{ + Resources: Resources{ CgroupParent: runner.setCgroupParent, NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000, Memory: maxRAM, // RAM @@ -1093,34 +1094,34 @@ func (runner *ContainerRunner) CreateContainer() error { KernelMemory: maxRAM, // kernel portion }, } - + runner.ContainerExecRunner.SetHostConfig(hostConfig) if runner.Container.RuntimeConstraints.API { tok, err := runner.ContainerToken() if err != nil { return err } - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, + containerConfig.Env = append(containerConfig.Env, "ARVADOS_API_TOKEN="+tok, "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"), "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"), ) - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) + runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode)) } else { if runner.enableNetwork == "always" { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) + runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode)) } else { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none") + runner.ContainerExecRunner.SetNetworkMode("none") } } _, stdinUsed := runner.Container.Mounts["stdin"] - runner.ContainerConfig.OpenStdin = stdinUsed - runner.ContainerConfig.StdinOnce = stdinUsed - runner.ContainerConfig.AttachStdin = stdinUsed - runner.ContainerConfig.AttachStdout = true - runner.ContainerConfig.AttachStderr = true + containerConfig.OpenStdin = stdinUsed + containerConfig.StdinOnce = stdinUsed + containerConfig.AttachStdin = stdinUsed + containerConfig.AttachStdout = true + containerConfig.AttachStderr = true - createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) + createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err) } @@ -1138,8 +1139,8 @@ func (runner *ContainerRunner) StartContainer() error { if runner.cCancelled { return ErrCancelled } - err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, - dockertypes.ContainerStartOptions{}) + err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID, + ContainerStartOptions{}) if err != nil { var advice string if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { @@ -1156,7 +1157,7 @@ func (runner *ContainerRunner) WaitFinish() error { var runTimeExceeded <-chan time.Time runner.CrunchLog.Print("Waiting for container to finish") - waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning) + waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning) arvMountExit := runner.ArvMountExit if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 { runTimeExceeded = time.After(time.Duration(timeout) * time.Second) @@ -1170,7 +1171,7 @@ func (runner *ContainerRunner) WaitFinish() error { } for range time.NewTicker(runner.containerWatchdogInterval).C { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval)) - ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID) + ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID) cancel() runner.cStateLock.Lock() done := runner.cRemoved || runner.ExitCode != nil @@ -1433,15 +1434,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C // Already finalized. return } - mt, err := runner.LogCollection.MarshalManifest(".") - if err != nil { - err = fmt.Errorf("error creating log manifest: %v", err) - return - } updates := arvadosclient.Dict{ - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt, + "name": "logs for " + runner.Container.UUID, } + mt, err1 := runner.LogCollection.MarshalManifest(".") + if err1 == nil { + // Only send updated manifest text if there was no + // error. + updates["manifest_text"] = mt + } + + // Even if flushing the manifest had an error, we still want + // to update the log record, if possible, to push the trash_at + // and delete_at times into the future. Details on bug + // #17293. if final { updates["is_trashed"] = true } else { @@ -1450,16 +1456,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C updates["delete_at"] = exp } reqBody := arvadosclient.Dict{"collection": updates} + var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true - err = runner.DispatcherArvClient.Create("collections", reqBody, &response) + err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response) } else { - err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) + err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) } - if err != nil { - return + if err2 == nil { + runner.logUUID = response.UUID + } + + if err1 != nil || err2 != nil { + err = fmt.Errorf("error recording logs: %q, %q", err1, err2) } - runner.logUUID = response.UUID return } @@ -1718,14 +1728,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error { func NewContainerRunner(dispatcherClient *arvados.Client, dispatcherArvClient IArvadosClient, dispatcherKeepClient IKeepClient, - docker ThinDockerClient, + containerRunner ThinContainerExecRunner, containerUUID string) (*ContainerRunner, error) { cr := &ContainerRunner{ dispatcherClient: dispatcherClient, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, - Docker: docker, + ContainerExecRunner: containerRunner, } cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd @@ -1785,7 +1795,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s `) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") - + containerRunner := flags.String("container-runner", "docker", + `Specify the container runner. available options: docker, singularity. + `) ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { // This process was invoked by a parent process, which @@ -1854,22 +1866,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - // API version 1.21 corresponds to Docker 1.9, which is currently the - // minimum version we want to support. - docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) + var cr *ContainerRunner + if *containerRunner == "docker" { + // API version 1.21 corresponds to Docker 1.9, which is currently the + // minimum version we want to support. + docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID) - if err != nil { - log.Print(err) - return 1 - } - if dockererr != nil { - cr.CrunchLog.Printf("%s: %v", containerID, dockererr) - cr.checkBrokenNode(dockererr) - cr.CrunchLog.Close() - return 1 - } + cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID) + if err != nil { + log.Print(err) + return 1 + } + if dockererr != nil { + cr.CrunchLog.Printf("%s: %v", containerID, dockererr) + cr.checkBrokenNode(dockererr) + cr.CrunchLog.Close() + return 1 + } + } else { + // Singularity + singularity, singularityerr := NewSingularityClient() + + cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID) + if err != nil { + log.Print(err) + return 1 + } + + if singularityerr != nil { + cr.CrunchLog.Printf("%s: %v", containerID, singularityerr) + //cr.checkBrokenNode(singularityrerr) // + cr.CrunchLog.Close() + return 1 + } + } cr.gateway = Gateway{ Address: os.Getenv("GatewayAddress"), AuthSecret: os.Getenv("GatewayAuthSecret"), @@ -1878,10 +1909,12 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s Log: cr.CrunchLog, } os.Unsetenv("GatewayAuthSecret") - err = cr.gateway.Start() - if err != nil { - log.Printf("error starting gateway server: %s", err) - return 1 + if cr.gateway.Address != "" { + err = cr.gateway.Start() + if err != nil { + log.Printf("error starting gateway server: %s", err) + return 1 + } } parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")