X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1c434a9f9ec70d8f23583bc737a516c3ef0eb91d..950ae9635334cd1ca6a2738b185f6481cc3d771f:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 6d8ec9c84e..b3d745f60e 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -5,12 +5,6 @@ import ( "errors" "flag" "fmt" - "git.curoverse.com/arvados.git/lib/crunchstat" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" - "github.com/curoverse/dockerclient" "io" "io/ioutil" "log" @@ -24,6 +18,13 @@ import ( "sync" "syscall" "time" + + "git.curoverse.com/arvados.git/lib/crunchstat" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" + "github.com/curoverse/dockerclient" ) // IArvadosClient is the minimal Arvados API methods used by crunch-run. @@ -32,6 +33,7 @@ type IArvadosClient interface { Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error + CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error) Discovery(key string) (interface{}, error) } @@ -90,12 +92,9 @@ type ContainerRunner struct { CleanupTempDir []string Binds []string OutputPDH *string - CancelLock sync.Mutex - Cancelled bool SigChan chan os.Signal ArvMountExit chan error finalState string - trashLifetime time.Duration statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -114,6 +113,13 @@ type ContainerRunner struct { // parent to be X" feature even on sites where the "specify // cgroup parent" feature breaks. setCgroupParent string + + cStateLock sync.Mutex + cStarted bool // StartContainer() succeeded + cCancelled bool // StopContainer() invoked + + enableNetwork string // one of "default" or "always" + networkMode string // passed through to HostConfig.NetworkMode } // SetupSignals sets up signal handling to gracefully terminate the underlying @@ -124,20 +130,29 @@ func (runner *ContainerRunner) SetupSignals() { signal.Notify(runner.SigChan, syscall.SIGINT) signal.Notify(runner.SigChan, syscall.SIGQUIT) - go func(sig <-chan os.Signal) { - for range sig { - if !runner.Cancelled { - runner.CancelLock.Lock() - runner.Cancelled = true - if runner.ContainerID != "" { - runner.Docker.StopContainer(runner.ContainerID, 10) - } - runner.CancelLock.Unlock() - } - } + go func(sig chan os.Signal) { + <-sig + runner.stop() + signal.Stop(sig) }(runner.SigChan) } +// stop the underlying Docker container. +func (runner *ContainerRunner) stop() { + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { + return + } + runner.cCancelled = true + if runner.cStarted { + err := runner.Docker.StopContainer(runner.ContainerID, 10) + if err != nil { + log.Printf("StopContainer failed: %s", err) + } + } +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -317,7 +332,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } + idx := strings.Index(mnt.PortableDataHash, "/") + if idx > 0 { + mnt.Path = path.Clean(mnt.PortableDataHash[idx:]) + mnt.PortableDataHash = mnt.PortableDataHash[0:idx] + runner.Container.Mounts[bind] = mnt + } src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + if mnt.Path != "" && mnt.Path != "." { + if strings.HasPrefix(mnt.Path, "./") { + mnt.Path = mnt.Path[2:] + } else if strings.HasPrefix(mnt.Path, "/") { + mnt.Path = mnt.Path[1:] + } + src += "/" + mnt.Path + } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") @@ -481,6 +510,98 @@ func (runner *ContainerRunner) StartCrunchstat() { runner.statReporter.Start() } +type infoCommand struct { + label string + cmd []string +} + +// Gather node information and store it on the log for debugging +// purposes. +func (runner *ContainerRunner) LogNodeInfo() (err error) { + w := runner.NewLogWriter("node-info") + logger := log.New(w, "node-info", 0) + + commands := []infoCommand{ + infoCommand{ + label: "Host Information", + cmd: []string{"uname", "-a"}, + }, + infoCommand{ + label: "CPU Information", + cmd: []string{"cat", "/proc/cpuinfo"}, + }, + infoCommand{ + label: "Memory Information", + cmd: []string{"cat", "/proc/meminfo"}, + }, + infoCommand{ + label: "Disk Space", + cmd: []string{"df", "-m", "/", os.TempDir()}, + }, + infoCommand{ + label: "Disk INodes", + cmd: []string{"df", "-i", "/", os.TempDir()}, + }, + } + + // Run commands with informational output to be logged. + var out []byte + for _, command := range commands { + out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput() + if err != nil { + return fmt.Errorf("While running command %q: %v", + command.cmd, err) + } + logger.Println(command.label) + for _, line := range strings.Split(string(out), "\n") { + logger.Println(" ", line) + } + } + + err = w.Close() + if err != nil { + return fmt.Errorf("While closing node-info logs: %v", err) + } + return nil +} + +// Get and save the raw JSON container record from the API server +func (runner *ContainerRunner) LogContainerRecord() (err error) { + w := &ArvLogWriter{ + runner.ArvClient, + runner.Container.UUID, + "container", + runner.LogCollection.Open("container.json"), + } + // Get Container record JSON from the API Server + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("While retrieving container record from the API server: %v", err) + } + defer reader.Close() + // Read the API server response as []byte + json_bytes, err := ioutil.ReadAll(reader) + if err != nil { + return fmt.Errorf("While reading container record API server response: %v", err) + } + // Decode the JSON []byte + var cr map[string]interface{} + if err = json.Unmarshal(json_bytes, &cr); err != nil { + return fmt.Errorf("While decoding the container record JSON response: %v", err) + } + // Re-encode it using indentation to improve readability + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err = enc.Encode(cr); err != nil { + return fmt.Errorf("While logging the JSON container record: %v", err) + } + err = w.Close() + if err != nil { + return fmt.Errorf("While closing container.json log: %v", err) + } + return nil +} + // AttachLogs connects the docker container stdout and stderr logs to the // Arvados logger which logs to Keep and the API server logs table. func (runner *ContainerRunner) AttachStreams() (err error) { @@ -540,6 +661,15 @@ func (runner *ContainerRunner) CreateContainer() error { for k, v := range runner.Container.Environment { runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) } + + runner.HostConfig = dockerclient.HostConfig{ + Binds: runner.Binds, + CgroupParent: runner.setCgroupParent, + LogConfig: dockerclient.LogConfig{ + Type: "none", + }, + } + if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { tok, err := runner.ContainerToken() if err != nil { @@ -550,9 +680,13 @@ func (runner *ContainerRunner) CreateContainer() error { "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"), "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"), ) - runner.ContainerConfig.NetworkDisabled = false + runner.HostConfig.NetworkMode = runner.networkMode } else { - runner.ContainerConfig.NetworkDisabled = true + if runner.enableNetwork == "always" { + runner.HostConfig.NetworkMode = runner.networkMode + } else { + runner.HostConfig.NetworkMode = "none" + } } var err error @@ -561,24 +695,22 @@ func (runner *ContainerRunner) CreateContainer() error { return fmt.Errorf("While creating container: %v", err) } - runner.HostConfig = dockerclient.HostConfig{ - Binds: runner.Binds, - CgroupParent: runner.setCgroupParent, - LogConfig: dockerclient.LogConfig{ - Type: "none", - }, - } - return runner.AttachStreams() } // StartContainer starts the docker container created by CreateContainer. func (runner *ContainerRunner) StartContainer() error { runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { + return ErrCancelled + } err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig) if err != nil { return fmt.Errorf("could not start container: %v", err) } + runner.cStarted = true return nil } @@ -587,12 +719,22 @@ func (runner *ContainerRunner) StartContainer() error { func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Print("Waiting for container to finish") - result := runner.Docker.Wait(runner.ContainerID) - wr := <-result - if wr.Error != nil { - return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + waitDocker := runner.Docker.Wait(runner.ContainerID) + waitMount := runner.ArvMountExit + for waitDocker != nil { + select { + case err := <-waitMount: + runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) + waitMount = nil + runner.stop() + case wr := <-waitDocker: + if wr.Error != nil { + return fmt.Errorf("While waiting for container to finish: %v", wr.Error) + } + runner.ExitCode = &wr.ExitCode + waitDocker = nil + } } - runner.ExitCode = &wr.ExitCode // wait for stdout/stderr to complete <-runner.loggingDone @@ -636,7 +778,7 @@ func (runner *ContainerRunner) CaptureOutput() error { _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory - cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}} + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) if err != nil { return fmt.Errorf("While uploading output files: %v", err) @@ -674,20 +816,10 @@ func (runner *ContainerRunner) CaptureOutput() error { continue } - if strings.HasPrefix(bindSuffix, "/") == false { - bindSuffix = "/" + bindSuffix - } - if mnt.ExcludeFromOutput == true { continue } - idx := strings.Index(mnt.PortableDataHash, "/") - if idx > 0 { - mnt.Path = mnt.PortableDataHash[idx:] - mnt.PortableDataHash = mnt.PortableDataHash[0:idx] - } - // append to manifest_text m, err := runner.getCollectionManifestForPath(mnt, bindSuffix) if err != nil { @@ -699,10 +831,13 @@ func (runner *ContainerRunner) CaptureOutput() error { // Save output var response arvados.Collection + manifest := manifest.Manifest{Text: manifestText} + manifestText = manifest.Extract(".", ".").Text err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "output for " + runner.Container.UUID, "manifest_text": manifestText}}, &response) @@ -746,78 +881,12 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b return "", nil } - manifest := manifest.Manifest{Text: collection.ManifestText} - manifestText := manifest.NormalizedManifestForPath(mnt.Path) - - if manifestText == "" { - // It could be denormalized manifest - mntPath := strings.Trim(mnt.Path, "/") - manifestText = strings.Replace(collection.ManifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - wanted := "" - for _, stream := range strings.Split(manifestText, "\n") { - if strings.Index(stream, mntPath) == -1 { - continue - } - - for _, token := range strings.Split(manifestText, " ") { - if strings.Index(token, ":") == -1 { - wanted += " " + token - } else if strings.Index(token, ":"+mntPath) >= 0 { - wanted += " " + token + "\n" - break - } - } - } - return wanted, nil - } - - if mnt.Path == "" || mnt.Path == "/" { - // no path specified; return the entire manifest text after making adjustments - manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - } else { - // either a single stream or file from a stream is being sought - bindIdx := strings.LastIndex(bindSuffix, "/") - var bindSubdir, bindFileName string - if bindIdx >= 0 { - bindSubdir = "." + bindSuffix[0:bindIdx] - bindFileName = bindSuffix[bindIdx+1:] - } - mntPath := mnt.Path - if strings.HasSuffix(mntPath, "/") { - mntPath = mntPath[0 : len(mntPath)-1] - } - pathIdx := strings.LastIndex(mntPath, "/") - var pathSubdir, pathFileName string - if pathIdx >= 0 { - pathSubdir = "." + mntPath[0:pathIdx] - pathFileName = mntPath[pathIdx+1:] - } - - if strings.Index(manifestText, "."+mntPath+" ") != -1 { - // path refers to this complete stream - manifestText = strings.Replace(manifestText, "."+mntPath, "."+bindSuffix, -1) - } else { - // look for a matching file in this stream - manifestText = strings.Replace(manifestText, ":"+pathFileName, ":"+bindFileName, -1) - manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1) - } + mft := manifest.Manifest{Text: collection.ManifestText} + extracted := mft.Extract(mnt.Path, bindSuffix) + if extracted.Err != nil { + return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error()) } - - if manifestText == "" { - runner.CrunchLog.Printf("No manifest segment found for bind '%v' with path '%v'", bindSuffix, mnt.Path) - } - - return manifestText, nil -} - -func (runner *ContainerRunner) loadDiscoveryVars() { - tl, err := runner.ArvClient.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err) - } - runner.trashLifetime = time.Duration(tl.(float64)) * time.Second + return extracted.Text, nil } func (runner *ContainerRunner) CleanupDirs() { @@ -871,8 +940,9 @@ func (runner *ContainerRunner) CommitLogs() error { var response arvados.Collection err = runner.ArvClient.Create("collections", arvadosclient.Dict{ + "ensure_unique_name": true, "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) @@ -885,9 +955,9 @@ func (runner *ContainerRunner) CommitLogs() error { // UpdateContainerRunning updates the container state to "Running" func (runner *ContainerRunner) UpdateContainerRunning() error { - runner.CancelLock.Lock() - defer runner.CancelLock.Unlock() - if runner.Cancelled { + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + if runner.cCancelled { return ErrCancelled } return runner.ArvClient.Update("containers", runner.Container.UUID, @@ -931,9 +1001,9 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { // IsCancelled returns the value of Cancelled, with goroutine safety. func (runner *ContainerRunner) IsCancelled() bool { - runner.CancelLock.Lock() - defer runner.CancelLock.Unlock() - return runner.Cancelled + runner.cStateLock.Lock() + defer runner.cStateLock.Unlock() + return runner.cCancelled } // NewArvLogWriter creates an ArvLogWriter @@ -1029,6 +1099,17 @@ func (runner *ContainerRunner) Run() (err error) { return } + // Gather and record node information + err = runner.LogNodeInfo() + if err != nil { + return + } + // Save container.json record on log collection + err = runner.LogContainerRecord() + if err != nil { + return + } + runner.StartCrunchstat() if runner.IsCancelled() { @@ -1063,11 +1144,10 @@ func NewContainerRunner(api IArvadosClient, cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir - cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} + cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}} cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) - cr.loadDiscoveryVars() return cr } @@ -1077,6 +1157,14 @@ func main() { cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates") + enableNetwork := flag.String("container-enable-networking", "default", + `Specify if networking should be enabled for container. One of 'default', 'always': + default: only enable networking if container requests it. + always: containers always have networking enabled + `) + networkMode := flag.String("container-network-mode", "default", + `Set networking mode for container. Corresponds to Docker network mode (--net). + `) flag.Parse() containerId := flag.Arg(0) @@ -1108,6 +1196,8 @@ func main() { cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent + cr.enableNetwork = *enableNetwork + cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { p := findCgroup(*cgroupParentSubsystem) cr.setCgroupParent = p