X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/98db65de63c9e2acfeae6636ccc619171635bda0..0a27815bdf3f1d1bc1eb3771bcee9294b6f4136f:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index c125b27a5f..8c989d533b 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -95,7 +95,7 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - Docker ThinDockerClient + ContainerExecRunner ThinContainerExecRunner // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -119,35 +119,33 @@ type ContainerRunner struct { ContainerArvClient IArvadosClient ContainerKeepClient IKeepClient - Container arvados.Container - ContainerConfig dockercontainer.Config - HostConfig dockercontainer.HostConfig - token string - ContainerID string - ExitCode *int - NewLogWriter NewLogWriter - loggingDone chan bool - CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser - logUUID string - logMtx sync.Mutex - LogCollection arvados.CollectionFileSystem - LogsPDH *string - RunArvMount RunArvMount - MkTempDir MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) - finalState string - parentTemp string + Container arvados.Container + token string + ContainerID string + ExitCode *int + NewLogWriter NewLogWriter + loggingDone chan bool + CrunchLog *ThrottledLogger + Stdout io.WriteCloser + Stderr io.WriteCloser + logUUID string + logMtx sync.Mutex + LogCollection arvados.CollectionFileSystem + LogsPDH *string + RunArvMount RunArvMount + MkTempDir MkTempDir + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Binds []string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + SecretMounts map[string]arvados.Mount + MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) + finalState string + parentTemp string statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -178,6 +176,8 @@ type ContainerRunner struct { arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration + + gateway Gateway } // setupSignals sets up signal handling to gracefully terminate the underlying @@ -207,7 +207,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) { } runner.cCancelled = true runner.CrunchLog.Printf("removing container") - err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true}) + err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true}) if err != nil { runner.CrunchLog.Printf("error removing container: %s", err) } @@ -281,7 +281,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Printf("Using Docker image id '%s'", imageID) - _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID) + _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID) if err != nil { runner.CrunchLog.Print("Loading Docker image from keep") @@ -291,7 +291,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) } - response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true) + response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true) if err != nil { return fmt.Errorf("While loading container image into Docker: %v", err) } @@ -306,7 +306,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Print("Docker image is available") } - runner.ContainerConfig.Image = imageID + runner.ContainerExecRunner.SetImage(imageID) runner.ContainerKeepClient.ClearBlockCache() @@ -455,11 +455,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } for bind := range runner.SecretMounts { if _, ok := runner.Container.Mounts[bind]; ok { - return fmt.Errorf("Secret mount %q conflicts with regular mount", bind) + return fmt.Errorf("secret mount %q conflicts with regular mount", bind) } if runner.SecretMounts[bind].Kind != "json" && runner.SecretMounts[bind].Kind != "text" { - return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.", + return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", bind, runner.SecretMounts[bind].Kind) } binds = append(binds, bind) @@ -474,7 +474,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdout" || bind == "stderr" { // Is it a "file" mount kind? if mnt.Kind != "file" { - return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind) + return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) } // Does path start with OutputPath? @@ -490,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdin" { // Is it a "collection" mount kind? if mnt.Kind != "collection" && mnt.Kind != "json" { - return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind) + return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) } } @@ -500,7 +500,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" { if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" { - return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) + return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) } } @@ -508,17 +508,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { - return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") + return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") } if mnt.UUID != "" { if mnt.Writable { - return fmt.Errorf("Writing to existing collections currently not permitted.") + return fmt.Errorf("writing to existing collections currently not permitted") } pdhOnly = false src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") { - return fmt.Errorf("Can never write to a collection specified by portable data hash") + return fmt.Errorf("can never write to a collection specified by portable data hash") } idx := strings.Index(mnt.PortableDataHash, "/") if idx > 0 { @@ -539,7 +539,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount)) - tmpcount += 1 + tmpcount++ } if mnt.Writable { if bind == runner.Container.OutputPath { @@ -559,15 +559,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) { var tmpdir string tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp") if err != nil { - return fmt.Errorf("While creating mount temp dir: %v", err) + return fmt.Errorf("while creating mount temp dir: %v", err) } st, staterr := os.Stat(tmpdir) if staterr != nil { - return fmt.Errorf("While Stat on temp dir: %v", staterr) + return fmt.Errorf("while Stat on temp dir: %v", staterr) } err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777) if staterr != nil { - return fmt.Errorf("While Chmod temp dir: %v", err) + return fmt.Errorf("while Chmod temp dir: %v", err) } runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) if bind == runner.Container.OutputPath { @@ -618,10 +618,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } if runner.HostOutputDir == "" { - return fmt.Errorf("Output path does not correspond to a writable mount point") + return fmt.Errorf("output path does not correspond to a writable mount point") } - if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI { + if needCertMount && runner.Container.RuntimeConstraints.API { for _, certfile := range arvadosclient.CertFiles { _, err := os.Stat(certfile) if err == nil { @@ -640,20 +640,20 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { - return fmt.Errorf("While trying to start arv-mount: %v", err) + return fmt.Errorf("while trying to start arv-mount: %v", err) } for _, p := range collectionPaths { _, err = os.Stat(p) if err != nil { - return fmt.Errorf("While checking that input files exist: %v", err) + return fmt.Errorf("while checking that input files exist: %v", err) } } for _, cp := range copyFiles { st, err := os.Stat(cp.src) if err != nil { - return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } if st.IsDir() { err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { @@ -674,7 +674,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777) } else { - return fmt.Errorf("Source %q is not a regular file or directory", cp.src) + return fmt.Errorf("source %q is not a regular file or directory", cp.src) } }) } else if st.Mode().IsRegular() { @@ -684,7 +684,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } if err != nil { - return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } } @@ -944,15 +944,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) { // If stdin mount is provided, attach it to the docker container var stdinRdr arvados.File - var stdinJson []byte + var stdinJSON []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { if stdinMnt.Kind == "collection" { var stdinColl arvados.Collection - collId := stdinMnt.UUID - if collId == "" { - collId = stdinMnt.PortableDataHash + collID := stdinMnt.UUID + if collID == "" { + collID = stdinMnt.PortableDataHash } - err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl) + err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl) if err != nil { return fmt.Errorf("While getting stdin collection: %v", err) } @@ -966,16 +966,16 @@ func (runner *ContainerRunner) AttachStreams() (err error) { return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) } } else if stdinMnt.Kind == "json" { - stdinJson, err = json.Marshal(stdinMnt.Content) + stdinJSON, err = json.Marshal(stdinMnt.Content) if err != nil { return fmt.Errorf("While encoding stdin json data: %v", err) } } } - stdinUsed := stdinRdr != nil || len(stdinJson) != 0 - response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) + stdinUsed := stdinRdr != nil || len(stdinJSON) != 0 + response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID, + ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) } @@ -1016,9 +1016,9 @@ func (runner *ContainerRunner) AttachStreams() (err error) { stdinRdr.Close() response.CloseWrite() }() - } else if len(stdinJson) != 0 { + } else if len(stdinJSON) != 0 { go func() { - _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON)) if err != nil { runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) runner.stop(nil) @@ -1061,16 +1061,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { func (runner *ContainerRunner) CreateContainer() error { runner.CrunchLog.Print("Creating Docker container") - runner.ContainerConfig.Cmd = runner.Container.Command + containerConfig, err := runner.ContainerExecRunner.GetContainerConfig() + hostConfig, err := runner.ContainerExecRunner.GetHostConfig() + + containerConfig.Cmd = runner.Container.Command if runner.Container.Cwd != "." { - runner.ContainerConfig.WorkingDir = runner.Container.Cwd + containerConfig.WorkingDir = runner.Container.Cwd } for k, v := range runner.Container.Environment { - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) + containerConfig.Env = append(containerConfig.Env, k+"="+v) } - runner.ContainerConfig.Volumes = runner.Volumes + containerConfig.Volumes = runner.Volumes maxRAM := int64(runner.Container.RuntimeConstraints.RAM) minDockerRAM := int64(16) @@ -1078,12 +1081,12 @@ func (runner *ContainerRunner) CreateContainer() error { // Docker daemon won't let you set a limit less than ~10 MiB maxRAM = minDockerRAM * 1024 * 1024 } - runner.HostConfig = dockercontainer.HostConfig{ + hostConfig = HostConfig{ Binds: runner.Binds, - LogConfig: dockercontainer.LogConfig{ + LogConfig: LogConfig{ Type: "none", }, - Resources: dockercontainer.Resources{ + Resources: Resources{ CgroupParent: runner.setCgroupParent, NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000, Memory: maxRAM, // RAM @@ -1091,34 +1094,34 @@ func (runner *ContainerRunner) CreateContainer() error { KernelMemory: maxRAM, // kernel portion }, } - - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { + runner.ContainerExecRunner.SetHostConfig(hostConfig) + if runner.Container.RuntimeConstraints.API { tok, err := runner.ContainerToken() if err != nil { return err } - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, + containerConfig.Env = append(containerConfig.Env, "ARVADOS_API_TOKEN="+tok, "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"), "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"), ) - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) + runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode)) } else { if runner.enableNetwork == "always" { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) + runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode)) } else { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none") + runner.ContainerExecRunner.SetNetworkMode("none") } } _, stdinUsed := runner.Container.Mounts["stdin"] - runner.ContainerConfig.OpenStdin = stdinUsed - runner.ContainerConfig.StdinOnce = stdinUsed - runner.ContainerConfig.AttachStdin = stdinUsed - runner.ContainerConfig.AttachStdout = true - runner.ContainerConfig.AttachStderr = true + containerConfig.OpenStdin = stdinUsed + containerConfig.StdinOnce = stdinUsed + containerConfig.AttachStdin = stdinUsed + containerConfig.AttachStdout = true + containerConfig.AttachStderr = true - createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) + createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err) } @@ -1136,8 +1139,8 @@ func (runner *ContainerRunner) StartContainer() error { if runner.cCancelled { return ErrCancelled } - err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, - dockertypes.ContainerStartOptions{}) + err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID, + ContainerStartOptions{}) if err != nil { var advice string if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { @@ -1154,7 +1157,7 @@ func (runner *ContainerRunner) WaitFinish() error { var runTimeExceeded <-chan time.Time runner.CrunchLog.Print("Waiting for container to finish") - waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning) + waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning) arvMountExit := runner.ArvMountExit if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 { runTimeExceeded = time.After(time.Duration(timeout) * time.Second) @@ -1168,7 +1171,7 @@ func (runner *ContainerRunner) WaitFinish() error { } for range time.NewTicker(runner.containerWatchdogInterval).C { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval)) - ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID) + ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID) cancel() runner.cStateLock.Lock() done := runner.cRemoved || runner.ExitCode != nil @@ -1269,7 +1272,7 @@ func (runner *ContainerRunner) updateLogs() { // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput() error { - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { + if runner.Container.RuntimeConstraints.API { // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, @@ -1431,15 +1434,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C // Already finalized. return } - mt, err := runner.LogCollection.MarshalManifest(".") - if err != nil { - err = fmt.Errorf("error creating log manifest: %v", err) - return - } updates := arvadosclient.Dict{ - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt, + "name": "logs for " + runner.Container.UUID, + } + mt, err1 := runner.LogCollection.MarshalManifest(".") + if err1 == nil { + // Only send updated manifest text if there was no + // error. + updates["manifest_text"] = mt } + + // Even if flushing the manifest had an error, we still want + // to update the log record, if possible, to push the trash_at + // and delete_at times into the future. Details on bug + // #17293. if final { updates["is_trashed"] = true } else { @@ -1448,16 +1456,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C updates["delete_at"] = exp } reqBody := arvadosclient.Dict{"collection": updates} + var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true - err = runner.DispatcherArvClient.Create("collections", reqBody, &response) + err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response) } else { - err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) + err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) } - if err != nil { - return + if err2 == nil { + runner.logUUID = response.UUID + } + + if err1 != nil || err2 != nil { + err = fmt.Errorf("error recording logs: %q, %q", err1, err2) } - runner.logUUID = response.UUID return } @@ -1469,7 +1481,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error { return ErrCancelled } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) + arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) } // ContainerToken returns the api_token the container (and any @@ -1716,14 +1728,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error { func NewContainerRunner(dispatcherClient *arvados.Client, dispatcherArvClient IArvadosClient, dispatcherKeepClient IKeepClient, - docker ThinDockerClient, + containerRunner ThinContainerExecRunner, containerUUID string) (*ContainerRunner, error) { cr := &ContainerRunner{ dispatcherClient: dispatcherClient, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, - Docker: docker, + ContainerExecRunner: containerRunner, } cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd @@ -1783,7 +1795,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s `) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") - + containerRunner := flags.String("container-runner", "docker", + `Specify the container runner. available options: docker, singularity. + `) ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { // This process was invoked by a parent process, which @@ -1814,18 +1828,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } } - containerId := flags.Arg(0) + containerID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerId, prog, args, os.Stdout, os.Stderr) + return Detach(containerID, prog, args, os.Stdout, os.Stderr) case *kill >= 0: - return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerId == "" { + if containerID == "" { log.Printf("usage: %s [options] UUID", prog) return 1 } @@ -1839,38 +1853,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s api, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Printf("%s: %v", containerId, err) + log.Printf("%s: %v", containerID, err) return 1 } api.Retries = 8 kc, kcerr := keepclient.MakeKeepClient(api) if kcerr != nil { - log.Printf("%s: %v", containerId, kcerr) + log.Printf("%s: %v", containerID, kcerr) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - // API version 1.21 corresponds to Docker 1.9, which is currently the - // minimum version we want to support. - docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) + var cr *ContainerRunner + if *containerRunner == "docker" { + // API version 1.21 corresponds to Docker 1.9, which is currently the + // minimum version we want to support. + docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId) - if err != nil { - log.Print(err) - return 1 + cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID) + if err != nil { + log.Print(err) + return 1 + } + if dockererr != nil { + cr.CrunchLog.Printf("%s: %v", containerID, dockererr) + cr.checkBrokenNode(dockererr) + cr.CrunchLog.Close() + return 1 + } + } else { + // Singularity + + singularity, singularityerr := NewSingularityClient() + + cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID) + if err != nil { + log.Print(err) + return 1 + } + + if singularityerr != nil { + cr.CrunchLog.Printf("%s: %v", containerID, singularityerr) + //cr.checkBrokenNode(singularityrerr) // + cr.CrunchLog.Close() + return 1 + } } - if dockererr != nil { - cr.CrunchLog.Printf("%s: %v", containerId, dockererr) - cr.checkBrokenNode(dockererr) - cr.CrunchLog.Close() - return 1 + cr.gateway = Gateway{ + Address: os.Getenv("GatewayAddress"), + AuthSecret: os.Getenv("GatewayAuthSecret"), + ContainerUUID: containerID, + DockerContainerID: &cr.ContainerID, + Log: cr.CrunchLog, + } + os.Unsetenv("GatewayAuthSecret") + if cr.gateway.Address != "" { + err = cr.gateway.Start() + if err != nil { + log.Printf("error starting gateway server: %s", err) + return 1 + } } - parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".") + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".") if tmperr != nil { - log.Printf("%s: %v", containerId, tmperr) + log.Printf("%s: %v", containerID, tmperr) return 1 } @@ -1904,7 +1953,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } if runerr != nil { - log.Printf("%s: %v", containerId, runerr) + log.Printf("%s: %v", containerID, runerr) return 1 } return 0