X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/85c684122fd678dc24035d58236dd5734aed50e5..1230d8a106c5c62edcbb9fcf6d1b94585e5596b2:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index f73ec73466..88c93e56c6 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -5,12 +5,6 @@ import ( "errors" "flag" "fmt" - "git.curoverse.com/arvados.git/lib/crunchstat" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" - "github.com/curoverse/dockerclient" "io" "io/ioutil" "log" @@ -24,6 +18,13 @@ import ( "sync" "syscall" "time" + + "git.curoverse.com/arvados.git/lib/crunchstat" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" + "github.com/curoverse/dockerclient" ) // IArvadosClient is the minimal Arvados API methods used by crunch-run. @@ -95,8 +96,8 @@ type ContainerRunner struct { SigChan chan os.Signal ArvMountExit chan error finalState string - trashLifetime time.Duration + infoLogger io.WriteCloser statLogger io.WriteCloser statReporter *crunchstat.Reporter statInterval time.Duration @@ -124,20 +125,29 @@ func (runner *ContainerRunner) SetupSignals() { signal.Notify(runner.SigChan, syscall.SIGINT) signal.Notify(runner.SigChan, syscall.SIGQUIT) - go func(sig <-chan os.Signal) { - for range sig { - if !runner.Cancelled { - runner.CancelLock.Lock() - runner.Cancelled = true - if runner.ContainerID != "" { - runner.Docker.StopContainer(runner.ContainerID, 10) - } - runner.CancelLock.Unlock() - } - } + go func(sig chan os.Signal) { + <-sig + runner.stop() + signal.Stop(sig) }(runner.SigChan) } +// stop the underlying Docker container. +func (runner *ContainerRunner) stop() { + runner.CancelLock.Lock() + defer runner.CancelLock.Unlock() + if runner.Cancelled { + return + } + runner.Cancelled = true + if runner.ContainerID != "" { + err := runner.Docker.StopContainer(runner.ContainerID, 10) + if err != nil { + log.Printf("StopContainer failed: %s", err) + } + } +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -317,7 +327,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } + idx := strings.Index(mnt.PortableDataHash, "/") + if idx > 0 { + mnt.Path = path.Clean(mnt.PortableDataHash[idx:]) + mnt.PortableDataHash = mnt.PortableDataHash[0:idx] + runner.Container.Mounts[bind] = mnt + } src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + if mnt.Path != "" && mnt.Path != "." { + if strings.HasPrefix(mnt.Path, "./") { + mnt.Path = mnt.Path[2:] + } else if strings.HasPrefix(mnt.Path, "/") { + mnt.Path = mnt.Path[1:] + } + src += "/" + mnt.Path + } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") @@ -481,6 +505,51 @@ func (runner *ContainerRunner) StartCrunchstat() { runner.statReporter.Start() } +type infoCommand struct { + label string + command string + args []string +} + +func newInfoCommand(label string, command string) infoCommand { + cmd := strings.Split(command, " ") + return infoCommand{ + label: label, + command: cmd[0], + args: cmd[1:], + } +} + +// Gather node information and store it on the log for debugging +// purposes. +func (runner *ContainerRunner) LogNodeInfo() (err error) { + w := runner.NewLogWriter("node-info") + logger := log.New(w, "node-info", 0) + + commands := []infoCommand{ + newInfoCommand("Host Information", "uname -a"), + newInfoCommand("CPU Information", "cat /proc/cpuinfo"), + newInfoCommand("Memory Information", "cat /proc/meminfo"), + newInfoCommand("Disk Space", "df -m"), + } + + var out []byte + for _, command := range commands { + out, err = exec.Command(command.command, command.args...).Output() + if err != nil { + return fmt.Errorf("While running command '%s': %v", + command.command, err) + } + logger.Printf("%s:\n%s\n", command.label, out) + } + + err = w.Close() + if err != nil { + return fmt.Errorf("While closing node-info logs: %v", err) + } + return nil +} + // AttachLogs connects the docker container stdout and stderr logs to the // Arvados logger which logs to Keep and the API server logs table. func (runner *ContainerRunner) AttachStreams() (err error) { @@ -587,12 +656,22 @@ func (runner *ContainerRunner) StartContainer() error { func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Print("Waiting for container to finish") - result := runner.Docker.Wait(runner.ContainerID) - wr := <-result - if wr.Error != nil { - return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + waitDocker := runner.Docker.Wait(runner.ContainerID) + waitMount := runner.ArvMountExit + for waitDocker != nil { + select { + case err := <-waitMount: + runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) + waitMount = nil + runner.stop() + case wr := <-waitDocker: + if wr.Error != nil { + return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + } + runner.ExitCode = &wr.ExitCode + waitDocker = nil + } } - runner.ExitCode = &wr.ExitCode // wait for stdout/stderr to complete <-runner.loggingDone @@ -636,7 +715,7 @@ func (runner *ContainerRunner) CaptureOutput() error { _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory - cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}} + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) if err != nil { return fmt.Errorf("While uploading output files: %v", err) @@ -674,20 +753,10 @@ func (runner *ContainerRunner) CaptureOutput() error { continue } - if strings.HasPrefix(bindSuffix, "/") == false { - bindSuffix = "/" + bindSuffix - } - if mnt.ExcludeFromOutput == true { continue } - idx := strings.Index(mnt.PortableDataHash, "/") - if idx > 0 { - mnt.Path = mnt.PortableDataHash[idx:] - mnt.PortableDataHash = mnt.PortableDataHash[0:idx] - } - // append to manifest_text m, err := runner.getCollectionManifestForPath(mnt, bindSuffix) if err != nil { @@ -699,11 +768,13 @@ func (runner *ContainerRunner) CaptureOutput() error { // Save output var response arvados.Collection - manifestText := manifest.Manifest{Text: manifestText}.NormalizedText() + manifest := manifest.Manifest{Text: manifestText} + manifestText = manifest.Extract(".", ".").Text err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "output for " + runner.Container.UUID, "manifest_text": manifestText}}, &response) @@ -747,17 +818,12 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b return "", nil } - manifest := manifest.Manifest{Text: collection.ManifestText} - manifestText := manifest.ManifestTextForPath(mnt.Path, bindSuffix) - return manifestText, nil -} - -func (runner *ContainerRunner) loadDiscoveryVars() { - tl, err := runner.ArvClient.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err) + mft := manifest.Manifest{Text: collection.ManifestText} + extracted := mft.Extract(mnt.Path, bindSuffix) + if extracted.Err != nil { + return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error()) } - runner.trashLifetime = time.Duration(tl.(float64)) * time.Second + return extracted.Text, nil } func (runner *ContainerRunner) CleanupDirs() { @@ -811,8 +877,9 @@ func (runner *ContainerRunner) CommitLogs() error { var response arvados.Collection err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) @@ -969,6 +1036,12 @@ func (runner *ContainerRunner) Run() (err error) { return } + // Gather and record node information + err = runner.LogNodeInfo() + if err != nil { + return + } + runner.StartCrunchstat() if runner.IsCancelled() { @@ -1003,11 +1076,10 @@ func NewContainerRunner(api IArvadosClient, cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir - cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} + cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}} cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) - cr.loadDiscoveryVars() return cr }