X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bc9845761c44beecdd046620694f6d88af0e32fd..10382e4e34d7ce9dd572fed50f7747124b2d857a:/services/crunch-run/crunchrun.go?ds=sidebyside diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 640ac88ca9..9c52f18bd0 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,15 +1,19 @@ package main import ( + "encoding/json" "errors" "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" "github.com/curoverse/dockerclient" "io" + "io/ioutil" "log" "os" + "os/exec" "os/signal" "strings" "sync" @@ -33,11 +37,18 @@ type IKeepClient interface { } // Mount describes the mount points to create inside the container. -type Mount struct{} +type Mount struct { + Kind string `json:"kind"` + Writable bool `json:"writable"` + PortableDataHash string `json:"portable_data_hash"` + UUID string `json:"uuid"` + DeviceType string `json:"device_type"` +} // Collection record returned by the API server. -type Collection struct { - ManifestText string `json:"manifest_text"` +type CollectionRecord struct { + ManifestText string `json:"manifest_text"` + PortableDataHash string `json:"portable_data_hash"` } // ContainerRecord is the container record returned by the API server. @@ -52,6 +63,7 @@ type ContainerRecord struct { Priority int `json:"priority"` RuntimeConstraints map[string]interface{} `json:"runtime_constraints"` State string `json:"state"` + Output string `json:"output"` } // NewLogWriter is a factory function to create a new log writer. @@ -86,6 +98,11 @@ type ContainerRunner struct { Stderr *ThrottledLogger LogCollection *CollectionWriter LogsPDH *string + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Binds []string + OutputPDH *string CancelLock sync.Mutex Cancelled bool SigChan chan os.Signal @@ -123,17 +140,17 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage) - var collection Collection + var collection CollectionRecord err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection) if err != nil { - return err + return fmt.Errorf("While getting container image collection: %v", err) } manifest := manifest.Manifest{Text: collection.ManifestText} var img, imageID string for ms := range manifest.StreamIter() { img = ms.FileStreamSegments[0].Name if !strings.HasSuffix(img, ".tar") { - return errors.New("First file in the collection does not end in .tar") + return fmt.Errorf("First file in the container image collection does not end in .tar") } imageID = img[:len(img)-4] } @@ -147,12 +164,12 @@ func (runner *ContainerRunner) LoadImage() (err error) { var readCloser io.ReadCloser readCloser, err = runner.Kc.ManifestFileReader(manifest, img) if err != nil { - return err + return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) } err = runner.Docker.LoadImage(readCloser) if err != nil { - return err + return fmt.Errorf("While loading container image into Docker: %v", err) } } else { runner.CrunchLog.Print("Docker image is available") @@ -163,6 +180,93 @@ func (runner *ContainerRunner) LoadImage() (err error) { return nil } +func (runner *ContainerRunner) SetupMounts() (err error) { + runner.ArvMountPoint, err = ioutil.TempDir("", "keep") + if err != nil { + return fmt.Errorf("While creating keep mount temp dir: %v", err) + } + + pdhOnly := true + tmpcount := 0 + arvMountCmd := []string{"--foreground"} + collectionPaths := []string{} + + for bind, mnt := range runner.ContainerRecord.Mounts { + if mnt.Kind == "collection" { + var src string + if mnt.UUID != "" && mnt.PortableDataHash != "" { + return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") + } + if mnt.UUID != "" { + if mnt.Writable { + return fmt.Errorf("Writing to existing collections currently not permitted.") + } + pdhOnly = false + src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) + } else if mnt.PortableDataHash != "" { + if mnt.Writable { + return fmt.Errorf("Can never write to a collection specified by portable data hash") + } + src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + } else { + src = fmt.Sprintf("%s/tmp%i", runner.ArvMountPoint, tmpcount) + arvMountCmd = append(arvMountCmd, "--mount-tmp") + arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount)) + tmpcount += 1 + } + if mnt.Writable { + if bind == runner.ContainerRecord.OutputPath { + runner.HostOutputDir = src + } + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) + } else { + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) + } + collectionPaths = append(collectionPaths, src) + } else if mnt.Kind == "tmp" { + if bind == runner.ContainerRecord.OutputPath { + runner.HostOutputDir, err = ioutil.TempDir("", "") + if err != nil { + return fmt.Errorf("While creating mount temp dir: %v", err) + } + + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) + } else { + runner.Binds = append(runner.Binds, bind) + } + } else { + return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind) + } + } + + if runner.HostOutputDir == "" { + return fmt.Errorf("Output path does not correspond to a writable mount point") + } + + if pdhOnly { + arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id") + } else { + arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") + } + arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) + + runner.ArvMount = exec.Command("arv-mount", arvMountCmd...) + err = runner.ArvMount.Start() + if err != nil { + runner.ArvMount = nil + return fmt.Errorf("While trying to start arv-mount: %v", err) + } + + for _, p := range collectionPaths { + _, err = os.Stat(p) + if err != nil { + return fmt.Errorf("While checking that input files exist: %v", err) + } + } + + return nil +} + // StartContainer creates the container and runs it. func (runner *ContainerRunner) StartContainer() (err error) { runner.CrunchLog.Print("Creating Docker container") @@ -181,16 +285,17 @@ func (runner *ContainerRunner) StartContainer() (err error) { for k, v := range runner.ContainerRecord.Environment { runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) } + runner.ContainerConfig.NetworkDisabled = true runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil) if err != nil { - return + return fmt.Errorf("While creating container: %v", err) } - hostConfig := &dockerclient.HostConfig{} + hostConfig := &dockerclient.HostConfig{Binds: runner.Binds} runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) err = runner.Docker.StartContainer(runner.ContainerID, hostConfig) if err != nil { - return + return fmt.Errorf("While starting container: %v", err) } return nil @@ -205,11 +310,11 @@ func (runner *ContainerRunner) AttachLogs() (err error) { var stderrReader, stdoutReader io.Reader stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true}) if err != nil { - return + return fmt.Errorf("While getting container standard error: %v", err) } stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true}) if err != nil { - return + return fmt.Errorf("While getting container standard output: %v", err) } runner.loggingDone = make(chan bool) @@ -228,7 +333,7 @@ func (runner *ContainerRunner) WaitFinish() error { result := runner.Docker.Wait(runner.ContainerID) wr := <-result if wr.Error != nil { - return wr.Error + return fmt.Errorf("While waiting for container to finish: %v", wr.Error) } runner.ExitCode = &wr.ExitCode @@ -242,6 +347,71 @@ func (runner *ContainerRunner) WaitFinish() error { return nil } +// HandleOutput sets the output and unmounts the FUSE mount. +func (runner *ContainerRunner) CaptureOutput() error { + if runner.ArvMount != nil { + defer func() { + umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) + umount.Run() + }() + } + + if runner.finalState != "Complete" { + return nil + } + + if runner.HostOutputDir == "" { + return nil + } + + _, err := os.Stat(runner.HostOutputDir) + if err != nil { + return fmt.Errorf("While checking host output path: %v", err) + } + + var manifestText string + + collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir) + _, err = os.Stat(collectionMetafile) + if err != nil { + // Regular directory + cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}} + manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + if err != nil { + return fmt.Errorf("While uploading output files: %v", err) + } + } else { + // FUSE mount directory + file, openerr := os.Open(collectionMetafile) + if openerr != nil { + return fmt.Errorf("While opening FUSE metafile: %v", err) + } + defer file.Close() + + rec := CollectionRecord{} + err = json.NewDecoder(file).Decode(&rec) + if err != nil { + return fmt.Errorf("While reading FUSE metafile: %v", err) + } + manifestText = rec.ManifestText + } + + var response CollectionRecord + err = runner.ArvClient.Create("collections", + arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + "manifest_text": manifestText}}, + &response) + if err != nil { + return fmt.Errorf("While creating output collection: %v", err) + } + + runner.OutputPDH = new(string) + *runner.OutputPDH = response.PortableDataHash + + return nil +} + // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) @@ -256,28 +426,30 @@ func (runner *ContainerRunner) CommitLogs() error { mt, err := runner.LogCollection.ManifestText() if err != nil { - return err + return fmt.Errorf("While creating log manifest: %v", err) } - response := make(map[string]string) + var response CollectionRecord err = runner.ArvClient.Create("collections", - arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID, - "manifest_text": mt}, - response) + arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + "name": "logs for " + runner.ContainerRecord.UUID, + "manifest_text": mt}}, + &response) if err != nil { - return err + return fmt.Errorf("While creating log collection: %v", err) } runner.LogsPDH = new(string) - *runner.LogsPDH = response["portable_data_hash"] + *runner.LogsPDH = response.PortableDataHash return nil } // UpdateContainerRecordRunning updates the container state to "Running" func (runner *ContainerRunner) UpdateContainerRecordRunning() error { - update := arvadosclient.Dict{"state": "Running"} - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil) + return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, + arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } // UpdateContainerRecordComplete updates the container record state on API @@ -290,10 +462,13 @@ func (runner *ContainerRunner) UpdateContainerRecordComplete() error { if runner.ExitCode != nil { update["exit_code"] = *runner.ExitCode } + if runner.OutputPDH != nil { + update["output"] = runner.OutputPDH + } update["state"] = runner.finalState - return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil) + return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil) } // NewArvLogWriter creates an ArvLogWriter @@ -302,8 +477,8 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { } // Run the full container lifecycle. -func (runner *ContainerRunner) Run(containerUUID string) (err error) { - runner.CrunchLog.Printf("Executing container '%s'", containerUUID) +func (runner *ContainerRunner) Run() (err error) { + runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID) var runerr, waiterr error @@ -318,13 +493,19 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { runner.finalState = "Complete" } - // (6) write logs + // (7) capture output + outputerr := runner.CaptureOutput() + if outputerr != nil { + runner.CrunchLog.Print(outputerr) + } + + // (8) write logs logerr := runner.CommitLogs() if logerr != nil { runner.CrunchLog.Print(logerr) } - // (7) update container record with results + // (9) update container record with results updateerr := runner.UpdateContainerRecordComplete() if updateerr != nil { runner.CrunchLog.Print(updateerr) @@ -345,24 +526,30 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { } }() - err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord) + err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord) if err != nil { - return + return fmt.Errorf("While getting container record: %v", err) } - // (0) setup signal handling + // (1) setup signal handling err = runner.SetupSignals() if err != nil { - return + return fmt.Errorf("While setting up signal handling: %v", err) } - // (1) check for and/or load image + // (2) check for and/or load image err = runner.LoadImage() if err != nil { - return + return fmt.Errorf("While loading container image: %v", err) + } + + // (3) set up FUSE mount and binds + err = runner.SetupMounts() + if err != nil { + return fmt.Errorf("While setting up mounts: %v", err) } - // (2) start container + // (3) create and start container err = runner.StartContainer() if err != nil { if err == ErrCancelled { @@ -371,19 +558,19 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { return } - // (3) update container record state + // (4) update container record state err = runner.UpdateContainerRecordRunning() if err != nil { runner.CrunchLog.Print(err) } - // (4) attach container logs + // (5) attach container logs runerr = runner.AttachLogs() if runerr != nil { runner.CrunchLog.Print(runerr) } - // (5) wait for container to finish + // (6) wait for container to finish waiterr = runner.WaitFinish() return @@ -392,11 +579,13 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) { // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient, - docker ThinDockerClient) *ContainerRunner { + docker ThinDockerClient, + containerUUID string) *ContainerRunner { cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker} cr.NewLogWriter = cr.NewArvLogWriter - cr.LogCollection = &CollectionWriter{kc, nil} + cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} + cr.ContainerRecord.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) return cr } @@ -423,9 +612,9 @@ func main() { log.Fatal(err) } - cr := NewContainerRunner(api, kc, docker) + cr := NewContainerRunner(api, kc, docker, flag.Arg(0)) - err = cr.Run(flag.Arg(0)) + err = cr.Run() if err != nil { log.Fatal(err) }