X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b47e664b17efbedd41b868d05164272d5549ffc1..10382e4e34d7ce9dd572fed50f7747124b2d857a:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index a7d8a58b2e..9c52f18bd0 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,14 +1,16 @@ package main import ( + "encoding/json" "errors" "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" "github.com/curoverse/dockerclient" "io" - "ioutil" + "io/ioutil" "log" "os" "os/exec" @@ -99,7 +101,8 @@ type ContainerRunner struct { ArvMount *exec.Cmd ArvMountPoint string HostOutputDir string - OutputPDH string + Binds []string + OutputPDH *string CancelLock sync.Mutex Cancelled bool SigChan chan os.Signal @@ -177,8 +180,12 @@ func (runner *ContainerRunner) LoadImage() (err error) { return nil } -func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) (err error) { - runner.ArvMountPoint = ioutil.TempDir("", "keep") +func (runner *ContainerRunner) SetupMounts() (err error) { + runner.ArvMountPoint, err = ioutil.TempDir("", "keep") + if err != nil { + return fmt.Errorf("While creating keep mount temp dir: %v", err) + } + pdhOnly := true tmpcount := 0 arvMountCmd := []string{"--foreground"} @@ -195,14 +202,14 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) return fmt.Errorf("Writing to existing collections currently not permitted.") } pdhOnly = false - src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.UUID) + src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } - src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.PortableDataHash) + src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) } else { - src = fmt.Sprintf("%s/tmp%i", arvMountPoint, tmpcount) + src = fmt.Sprintf("%s/tmp%i", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount)) tmpcount += 1 @@ -211,17 +218,21 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) if bind == runner.ContainerRecord.OutputPath { runner.HostOutputDir = src } - hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s", src, bind)) + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) } else { - hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) } - collectionPaths = append(collections, src) + collectionPaths = append(collectionPaths, src) } else if mnt.Kind == "tmp" { if bind == runner.ContainerRecord.OutputPath { - runner.HostOutputDir = ioutil.TempDir("", "") - hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) + runner.HostOutputDir, err = ioutil.TempDir("", "") + if err != nil { + return fmt.Errorf("While creating mount temp dir: %v", err) + } + + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) } else { - hostConfig.Binds = append(hostConfig.Binds, bind) + runner.Binds = append(runner.Binds, bind) } } else { return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind) @@ -237,16 +248,16 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } - arvMountCmd = append(arvMountCmd, arvMountPoint) + arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) - runner.ArvMount = exec.Command("arv-mount", arvMountCmd) + runner.ArvMount = exec.Command("arv-mount", arvMountCmd...) err = runner.ArvMount.Start() if err != nil { runner.ArvMount = nil return fmt.Errorf("While trying to start arv-mount: %v", err) } - for p := range collectionPaths { + for _, p := range collectionPaths { _, err = os.Stat(p) if err != nil { return fmt.Errorf("While checking that input files exist: %v", err) @@ -274,16 +285,12 @@ func (runner *ContainerRunner) StartContainer() (err error) { for k, v := range runner.ContainerRecord.Environment { runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) } + runner.ContainerConfig.NetworkDisabled = true runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil) if err != nil { return fmt.Errorf("While creating container: %v", err) } - hostConfig := &dockerclient.HostConfig{} - - err = runner.SetupMounts(hostConfig) - if err != nil { - return fmt.Errorf("While setting up mounts: %v", err) - } + hostConfig := &dockerclient.HostConfig{Binds: runner.Binds} runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) err = runner.Docker.StartContainer(runner.ContainerID, hostConfig) @@ -341,7 +348,7 @@ func (runner *ContainerRunner) WaitFinish() error { } // HandleOutput sets the output and unmounts the FUSE mount. -func (runner *ContainerRunner) HandleOutput() error { +func (runner *ContainerRunner) CaptureOutput() error { if runner.ArvMount != nil { defer func() { umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) @@ -353,19 +360,23 @@ func (runner *ContainerRunner) HandleOutput() error { return nil } - _, err = os.Stat(os.HostOutputPath) + if runner.HostOutputDir == "" { + return nil + } + + _, err := os.Stat(runner.HostOutputDir) if err != nil { return fmt.Errorf("While checking host output path: %v", err) } var manifestText string - collectionMetafile = fmt.Sprintf("%s/.arvados#collection", os.HostOutputPath) + collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir) _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory - cw := CollectionWriter{runner.Kc} - manifestText, err = cw.WriteTree(os.HostOutputPath, runner.CrunchLog) + cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}} + manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) if err != nil { return fmt.Errorf("While uploading output files: %v", err) } @@ -386,12 +397,17 @@ func (runner *ContainerRunner) HandleOutput() error { } var response CollectionRecord - err = runner.ArvClient.Create("collections", &response) + err = runner.ArvClient.Create("collections", + arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + "manifest_text": manifestText}}, + &response) if err != nil { return fmt.Errorf("While creating output collection: %v", err) } - runner.OutputPDH = response.PortableDataHash + runner.OutputPDH = new(string) + *runner.OutputPDH = response.PortableDataHash return nil } @@ -413,25 +429,27 @@ func (runner *ContainerRunner) CommitLogs() error { return fmt.Errorf("While creating log manifest: %v", err) } - response := make(map[string]string) + var response CollectionRecord err = runner.ArvClient.Create("collections", - arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID, - "manifest_text": mt}, - response) + arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + "name": "logs for " + runner.ContainerRecord.UUID, + "manifest_text": mt}}, + &response) if err != nil { return fmt.Errorf("While creating log collection: %v", err) } runner.LogsPDH = new(string) - *runner.LogsPDH = response["portable_data_hash"] + *runner.LogsPDH = response.PortableDataHash return nil } // UpdateContainerRecordRunning updates the container state to "Running" func (runner *ContainerRunner) UpdateContainerRecordRunning() error { - update := arvadosclient.Dict{"state": "Running"} - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil) + return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, + arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } // UpdateContainerRecordComplete updates the container record state on API @@ -444,11 +462,13 @@ func (runner *ContainerRunner) UpdateContainerRecordComplete() error { if runner.ExitCode != nil { update["exit_code"] = *runner.ExitCode } + if runner.OutputPDH != nil { + update["output"] = runner.OutputPDH + } update["state"] = runner.finalState - update["output"] = runner.OutputPDH - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil) + return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil) } // NewArvLogWriter creates an ArvLogWriter @@ -457,8 +477,8 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { } // Run the full container lifecycle. -func (runner *ContainerRunner) Run(containerUUID string) (err error) { - runner.CrunchLog.Printf("Executing container '%s'", containerUUID) +func (runner *ContainerRunner) Run() (err error) { + runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID) var runerr, waiterr error @@ -473,19 +493,19 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { runner.finalState = "Complete" } - // (6) handle output - outputerr := runner.HandleOutput() + // (7) capture output + outputerr := runner.CaptureOutput() if outputerr != nil { - runner.CrunchLog.Print(outputrr) + runner.CrunchLog.Print(outputerr) } - // (7) write logs + // (8) write logs logerr := runner.CommitLogs() if logerr != nil { runner.CrunchLog.Print(logerr) } - // (8) update container record with results + // (9) update container record with results updateerr := runner.UpdateContainerRecordComplete() if updateerr != nil { runner.CrunchLog.Print(updateerr) @@ -506,24 +526,30 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { } }() - err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord) + err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord) if err != nil { return fmt.Errorf("While getting container record: %v", err) } - // (0) setup signal handling + // (1) setup signal handling err = runner.SetupSignals() if err != nil { return fmt.Errorf("While setting up signal handling: %v", err) } - // (1) check for and/or load image + // (2) check for and/or load image err = runner.LoadImage() if err != nil { return fmt.Errorf("While loading container image: %v", err) } - // (2) start container + // (3) set up FUSE mount and binds + err = runner.SetupMounts() + if err != nil { + return fmt.Errorf("While setting up mounts: %v", err) + } + + // (3) create and start container err = runner.StartContainer() if err != nil { if err == ErrCancelled { @@ -532,19 +558,19 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { return } - // (3) update container record state + // (4) update container record state err = runner.UpdateContainerRecordRunning() if err != nil { runner.CrunchLog.Print(err) } - // (4) attach container logs + // (5) attach container logs runerr = runner.AttachLogs() if runerr != nil { runner.CrunchLog.Print(runerr) } - // (5) wait for container to finish + // (6) wait for container to finish waiterr = runner.WaitFinish() return @@ -553,11 +579,13 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient, - docker ThinDockerClient) *ContainerRunner { + docker ThinDockerClient, + containerUUID string) *ContainerRunner { cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker} cr.NewLogWriter = cr.NewArvLogWriter - cr.LogCollection = &CollectionWriter{kc, nil} + cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} + cr.ContainerRecord.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) return cr } @@ -584,9 +612,9 @@ func main() { log.Fatal(err) } - cr := NewContainerRunner(api, kc, docker) + cr := NewContainerRunner(api, kc, docker, flag.Arg(0)) - err = cr.Run(flag.Arg(0)) + err = cr.Run() if err != nil { log.Fatal(err) }