X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/837c509a55f56fa653454e7b99e293f8f87ebef2..a54e88868ac259443e2cd8d5f6fddb4b8154acb9:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 8d292581e2..e0d707a5a5 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -5,12 +5,6 @@ import ( "errors" "flag" "fmt" - "git.curoverse.com/arvados.git/lib/crunchstat" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" - "github.com/curoverse/dockerclient" "io" "io/ioutil" "log" @@ -24,6 +18,13 @@ import ( "sync" "syscall" "time" + + "git.curoverse.com/arvados.git/lib/crunchstat" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" + "github.com/curoverse/dockerclient" ) // IArvadosClient is the minimal Arvados API methods used by crunch-run. @@ -32,6 +33,7 @@ type IArvadosClient interface { Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error + CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error) Discovery(key string) (interface{}, error) } @@ -90,12 +92,9 @@ type ContainerRunner struct { CleanupTempDir []string Binds []string OutputPDH *string - CancelLock sync.Mutex - Cancelled bool SigChan chan os.Signal ArvMountExit chan error finalState string - trashLifetime time.Duration statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -114,6 +113,10 @@ type ContainerRunner struct { // parent to be X" feature even on sites where the "specify // cgroup parent" feature breaks. setCgroupParent string + + cStateLock sync.Mutex + cStarted bool // StartContainer() succeeded + cCancelled bool // StopContainer() invoked } // SetupSignals sets up signal handling to gracefully terminate the underlying @@ -124,20 +127,29 @@ func (runner *ContainerRunner) SetupSignals() { signal.Notify(runner.SigChan, syscall.SIGINT) signal.Notify(runner.SigChan, syscall.SIGQUIT) - go func(sig <-chan os.Signal) { - for range sig { - if !runner.Cancelled { - runner.CancelLock.Lock() - runner.Cancelled = true - if runner.ContainerID != "" { - runner.Docker.StopContainer(runner.ContainerID, 10) - } - runner.CancelLock.Unlock() - } - } + go func(sig chan os.Signal) { + <-sig + runner.stop() + signal.Stop(sig) }(runner.SigChan) } +// stop the underlying Docker container. +func (runner *ContainerRunner) stop() { + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { + return + } + runner.cCancelled = true + if runner.cStarted { + err := runner.Docker.StopContainer(runner.ContainerID, 10) + if err != nil { + log.Printf("StopContainer failed: %s", err) + } + } +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -317,7 +329,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } + idx := strings.Index(mnt.PortableDataHash, "/") + if idx > 0 { + mnt.Path = path.Clean(mnt.PortableDataHash[idx:]) + mnt.PortableDataHash = mnt.PortableDataHash[0:idx] + runner.Container.Mounts[bind] = mnt + } src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + if mnt.Path != "" && mnt.Path != "." { + if strings.HasPrefix(mnt.Path, "./") { + mnt.Path = mnt.Path[2:] + } else if strings.HasPrefix(mnt.Path, "/") { + mnt.Path = mnt.Path[1:] + } + src += "/" + mnt.Path + } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") @@ -481,6 +507,105 @@ func (runner *ContainerRunner) StartCrunchstat() { runner.statReporter.Start() } +type infoCommand struct { + label string + cmd []string +} + +// Gather node information and store it on the log for debugging +// purposes. +func (runner *ContainerRunner) LogNodeInfo() (err error) { + w := runner.NewLogWriter("node-info") + logger := log.New(w, "node-info", 0) + + commands := []infoCommand{ + infoCommand{ + label: "Host Information", + cmd: []string{"uname", "-a"}, + }, + infoCommand{ + label: "CPU Information", + cmd: []string{"cat", "/proc/cpuinfo"}, + }, + infoCommand{ + label: "Memory Information", + cmd: []string{"cat", "/proc/meminfo"}, + }, + infoCommand{ + label: "Disk Space", + cmd: []string{"df", "-m", "/"}, + }, + infoCommand{ + label: "Disk Space", + cmd: []string{"df", "-m", os.TempDir()}, + }, + infoCommand{ + label: "Disk INodes", + cmd: []string{"df", "-i", "/"}, + }, + infoCommand{ + label: "Disk INodes", + cmd: []string{"df", "-i", os.TempDir()}, + }, + } + + // Run commands with informational output to be logged. + var out []byte + for _, command := range commands { + out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput() + if err != nil { + return fmt.Errorf("While running command %q: %v", + command.cmd, err) + } + logger.Println(command.label) + for _, line := range strings.Split(string(out), "\n") { + logger.Println(" ", line) + } + } + + err = w.Close() + if err != nil { + return fmt.Errorf("While closing node-info logs: %v", err) + } + return nil +} + +// Get and save the raw JSON container record from the API server +func (runner *ContainerRunner) LogContainerRecord() (err error) { + w := &ArvLogWriter{ + runner.ArvClient, + runner.Container.UUID, + "container", + runner.LogCollection.Open("container.json"), + } + // Get Container record JSON from the API Server + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("While retrieving container record from the API server: %v", err) + } + // Read the API server response as []byte + json_bytes, err := ioutil.ReadAll(reader) + if err != nil { + return fmt.Errorf("While reading container record API server response: %v", err) + } + // Decode the JSON []byte + var cr map[string]interface{} + if err = json.Unmarshal(json_bytes, &cr); err != nil { + return fmt.Errorf("While decoding the container record JSON response: %v", err) + } + // Re-encode it using indentation to improve readability + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err = enc.Encode(cr); err != nil { + return fmt.Errorf("While logging the JSON container record: %v", err) + } + err = w.Close() + if err != nil { + return fmt.Errorf("While closing container.json log: %v", err) + } + return nil +} + // AttachLogs connects the docker container stdout and stderr logs to the // Arvados logger which logs to Keep and the API server logs table. func (runner *ContainerRunner) AttachStreams() (err error) { @@ -575,10 +700,16 @@ func (runner *ContainerRunner) CreateContainer() error { // StartContainer starts the docker container created by CreateContainer. func (runner *ContainerRunner) StartContainer() error { runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { + return ErrCancelled + } err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig) if err != nil { return fmt.Errorf("could not start container: %v", err) } + runner.cStarted = true return nil } @@ -587,12 +718,22 @@ func (runner *ContainerRunner) StartContainer() error { func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Print("Waiting for container to finish") - result := runner.Docker.Wait(runner.ContainerID) - wr := <-result - if wr.Error != nil { - return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + waitDocker := runner.Docker.Wait(runner.ContainerID) + waitMount := runner.ArvMountExit + for waitDocker != nil { + select { + case err := <-waitMount: + runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) + waitMount = nil + runner.stop() + case wr := <-waitDocker: + if wr.Error != nil { + return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + } + runner.ExitCode = &wr.ExitCode + waitDocker = nil + } } - runner.ExitCode = &wr.ExitCode // wait for stdout/stderr to complete <-runner.loggingDone @@ -636,7 +777,7 @@ func (runner *ContainerRunner) CaptureOutput() error { _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory - cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}} + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) if err != nil { return fmt.Errorf("While uploading output files: %v", err) @@ -674,20 +815,10 @@ func (runner *ContainerRunner) CaptureOutput() error { continue } - if strings.HasPrefix(bindSuffix, "/") == false { - bindSuffix = "/" + bindSuffix - } - if mnt.ExcludeFromOutput == true { continue } - idx := strings.Index(mnt.PortableDataHash, "/") - if idx > 0 { - mnt.Path = mnt.PortableDataHash[idx:] - mnt.PortableDataHash = mnt.PortableDataHash[0:idx] - } - // append to manifest_text m, err := runner.getCollectionManifestForPath(mnt, bindSuffix) if err != nil { @@ -699,10 +830,13 @@ func (runner *ContainerRunner) CaptureOutput() error { // Save output var response arvados.Collection + manifest := manifest.Manifest{Text: manifestText} + manifestText = manifest.Extract(".", ".").Text err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "output for " + runner.Container.UUID, "manifest_text": manifestText}}, &response) @@ -746,72 +880,12 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b return "", nil } - manifest := manifest.Manifest{Text: collection.ManifestText} - manifestText := manifest.NormalizedManifestForPath(mnt.Path) - - if manifestText == "" { - // It could be denormalized manifest - mntPath := strings.Trim(mnt.Path, "/") - manifestText = strings.Replace(collection.ManifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - wanted := "" - for _, token := range strings.Split(manifestText, " ") { - if strings.Index(token, ":") == -1 { - wanted += " " + token - } else if strings.Index(token, ":"+mntPath) >= 0 { - wanted += " " + token + "\n" - break - } - } - return wanted, nil - } - - if mnt.Path == "" || mnt.Path == "/" { - // no path specified; return the entire manifest text after making adjustments - manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - } else { - // either a single stream or file from a stream is being sought - bindIdx := strings.LastIndex(bindSuffix, "/") - var bindSubdir, bindFileName string - if bindIdx >= 0 { - bindSubdir = "." + bindSuffix[0:bindIdx] - bindFileName = bindSuffix[bindIdx+1:] - } - mntPath := mnt.Path - if strings.HasSuffix(mntPath, "/") { - mntPath = mntPath[0 : len(mntPath)-1] - } - pathIdx := strings.LastIndex(mntPath, "/") - var pathSubdir, pathFileName string - if pathIdx >= 0 { - pathSubdir = "." + mntPath[0:pathIdx] - pathFileName = mntPath[pathIdx+1:] - } - - if strings.Index(manifestText, "."+mntPath+" ") != -1 { - // path refers to this complete stream - manifestText = strings.Replace(manifestText, "."+mntPath, "."+bindSuffix, -1) - } else { - // look for a matching file in this stream - manifestText = strings.Replace(manifestText, ":"+pathFileName, ":"+bindFileName, -1) - manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1) - } + mft := manifest.Manifest{Text: collection.ManifestText} + extracted := mft.Extract(mnt.Path, bindSuffix) + if extracted.Err != nil { + return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error()) } - - if manifestText == "" { - runner.CrunchLog.Printf("No manifest segment found for bind '%v' with path '%v'", bindSuffix, mnt.Path) - } - - return manifestText, nil -} - -func (runner *ContainerRunner) loadDiscoveryVars() { - tl, err := runner.ArvClient.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err) - } - runner.trashLifetime = time.Duration(tl.(float64)) * time.Second + return extracted.Text, nil } func (runner *ContainerRunner) CleanupDirs() { @@ -865,8 +939,9 @@ func (runner *ContainerRunner) CommitLogs() error { var response arvados.Collection err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) @@ -879,9 +954,9 @@ func (runner *ContainerRunner) CommitLogs() error { // UpdateContainerRunning updates the container state to "Running" func (runner *ContainerRunner) UpdateContainerRunning() error { - runner.CancelLock.Lock() - defer runner.CancelLock.Unlock() - if runner.Cancelled { + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { return ErrCancelled } return runner.ArvClient.Update("containers", runner.Container.UUID, @@ -925,9 +1000,9 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { // IsCancelled returns the value of Cancelled, with goroutine safety. func (runner *ContainerRunner) IsCancelled() bool { - runner.CancelLock.Lock() - defer runner.CancelLock.Unlock() - return runner.Cancelled + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + return runner.cCancelled } // NewArvLogWriter creates an ArvLogWriter @@ -1023,6 +1098,17 @@ func (runner *ContainerRunner) Run() (err error) { return } + // Gather and record node information + err = runner.LogNodeInfo() + if err != nil { + return + } + // Save container.json record on log collection + err = runner.LogContainerRecord() + if err != nil { + return + } + runner.StartCrunchstat() if runner.IsCancelled() { @@ -1057,11 +1143,10 @@ func NewContainerRunner(api IArvadosClient, cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir - cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} + cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}} cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) - cr.loadDiscoveryVars() return cr }