Merge branch 'master' into 10231-keep-cache-runtime-constraints
[arvados.git] / services / crunch-run / crunchrun.go
index 7da1beb20a4d5e4986eec2f8643d1ae99edeea2f..8e5cdb1f3b20ef1fbd9ef6114689148d12f0e781 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/lib/crunchstat"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -17,6 +18,7 @@ import (
        "os/exec"
        "os/signal"
        "path"
+       "path/filepath"
        "strings"
        "sync"
        "syscall"
@@ -27,8 +29,9 @@ import (
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
        Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
-       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
-       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+       Discovery(key string) (interface{}, error)
 }
 
 // ErrCancelled is the error returned when the container is cancelled.
@@ -91,6 +94,25 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+       trashLifetime  time.Duration
+
+       statLogger   io.WriteCloser
+       statReporter *crunchstat.Reporter
+       statInterval time.Duration
+       cgroupRoot   string
+       // What we expect the container's cgroup parent to be.
+       expectCgroupParent string
+       // What we tell docker to use as the container's cgroup
+       // parent. Note: Ideally we would use the same field for both
+       // expectCgroupParent and setCgroupParent, and just make it
+       // default to "docker". However, when using docker < 1.10 with
+       // systemd, specifying a non-empty cgroup parent (even the
+       // default value "docker") hits a docker bug
+       // (https://github.com/docker/docker/issues/17126). Using two
+       // separate fields makes it possible to use the "expect cgroup
+       // parent to be X" feature even on sites where the "specify
+       // cgroup parent" feature breaks.
+       setCgroupParent string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -102,7 +124,7 @@ func (runner *ContainerRunner) SetupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig <-chan os.Signal) {
-               for _ = range sig {
+               for range sig {
                        if !runner.Cancelled {
                                runner.CancelLock.Lock()
                                runner.Cancelled = true
@@ -248,7 +270,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
 
-               if mnt.Kind == "collection" {
+               switch {
+               case mnt.Kind == "collection":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -279,25 +302,47 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
                        collectionPaths = append(collectionPaths, src)
-               } else if mnt.Kind == "tmp" {
-                       if bind == runner.Container.OutputPath {
-                               runner.HostOutputDir, err = runner.MkTempDir("", "")
-                               if err != nil {
-                                       return fmt.Errorf("While creating mount temp dir: %v", err)
-                               }
-                               st, staterr := os.Stat(runner.HostOutputDir)
-                               if staterr != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", staterr)
-                               }
-                               err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
-                               if staterr != nil {
-                                       return fmt.Errorf("While Chmod temp dir: %v", err)
-                               }
-                               runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-                       } else {
-                               runner.Binds = append(runner.Binds, bind)
+
+               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
+                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("While creating mount temp dir: %v", err)
+                       }
+                       st, staterr := os.Stat(runner.HostOutputDir)
+                       if staterr != nil {
+                               return fmt.Errorf("While Stat on temp dir: %v", staterr)
+                       }
+                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       if staterr != nil {
+                               return fmt.Errorf("While Chmod temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
+
+               case mnt.Kind == "tmp":
+                       runner.Binds = append(runner.Binds, bind)
+
+               case mnt.Kind == "json":
+                       jsondata, err := json.Marshal(mnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("encoding json data: %v", err)
+                       }
+                       // Create a tempdir with a single file
+                       // (instead of just a tempfile): this way we
+                       // can ensure the file is world-readable
+                       // inside the container, without having to
+                       // make it world-readable on the docker host.
+                       tmpdir, err := runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("creating temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       tmpfn := filepath.Join(tmpdir, "mountdata.json")
+                       err = ioutil.WriteFile(tmpfn, jsondata, 0644)
+                       if err != nil {
+                               return fmt.Errorf("writing temp file: %v", err)
                        }
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
                }
        }
 
@@ -312,6 +357,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
+       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       }
+
        token, err := runner.ContainerToken()
        if err != nil {
                return fmt.Errorf("could not get container token: %s", err)
@@ -366,6 +415,14 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
                                runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
                        }
 
+                       if runner.statReporter != nil {
+                               runner.statReporter.Stop()
+                               closeerr = runner.statLogger.Close()
+                               if closeerr != nil {
+                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
+                               }
+                       }
+
                        runner.loggingDone <- true
                        close(runner.loggingDone)
                        return
@@ -373,6 +430,18 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        }
 }
 
+func (runner *ContainerRunner) StartCrunchstat() {
+       runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+       runner.statReporter = &crunchstat.Reporter{
+               CID:          runner.ContainerID,
+               Logger:       log.New(runner.statLogger, "", 0),
+               CgroupParent: runner.expectCgroupParent,
+               CgroupRoot:   runner.cgroupRoot,
+               PollPeriod:   runner.statInterval,
+       }
+       runner.statReporter.Start()
+}
+
 // AttachLogs connects the docker container stdout and stderr logs to the
 // Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
@@ -453,8 +522,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                return fmt.Errorf("While creating container: %v", err)
        }
 
-       runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
-               LogConfig: dockerclient.LogConfig{Type: "none"}}
+       runner.HostConfig = dockerclient.HostConfig{
+               Binds:        runner.Binds,
+               CgroupParent: runner.setCgroupParent,
+               LogConfig: dockerclient.LogConfig{
+                       Type: "none",
+               },
+       }
 
        return runner.AttachStreams()
 }
@@ -493,6 +567,21 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return nil
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+               // Output may have been set directly by the container, so
+               // refresh the container record to check.
+               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+                       nil, &runner.Container)
+               if err != nil {
+                       return err
+               }
+               if runner.Container.Output != "" {
+                       // Container output is already set.
+                       runner.OutputPDH = &runner.Container.Output
+                       return nil
+               }
+       }
+
        if runner.HostOutputDir == "" {
                return nil
        }
@@ -533,18 +622,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating output collection: %v", err)
        }
-
-       runner.OutputPDH = new(string)
-       *runner.OutputPDH = response.PortableDataHash
-
+       runner.OutputPDH = &response.PortableDataHash
        return nil
 }
 
+func (runner *ContainerRunner) loadDiscoveryVars() {
+       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
+       if err != nil {
+               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       }
+       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+}
+
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
@@ -597,15 +693,14 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating log collection: %v", err)
        }
-
        runner.LogsPDH = &response.PortableDataHash
-
        return nil
 }
 
@@ -752,6 +847,8 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
+       runner.StartCrunchstat()
+
        if runner.IsCancelled() {
                return
        }
@@ -788,10 +885,15 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.loadDiscoveryVars()
        return cr
 }
 
 func main() {
+       statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+       cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+       cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -803,7 +905,7 @@ func main() {
        api.Retries = 8
 
        var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(&api)
+       kc, err = keepclient.MakeKeepClient(api)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
@@ -816,6 +918,14 @@ func main() {
        }
 
        cr := NewContainerRunner(api, kc, docker, containerId)
+       cr.statInterval = *statInterval
+       cr.cgroupRoot = *cgroupRoot
+       cr.expectCgroupParent = *cgroupParent
+       if *cgroupParentSubsystem != "" {
+               p := findCgroup(*cgroupParentSubsystem)
+               cr.setCgroupParent = p
+               cr.expectCgroupParent = p
+       }
 
        err = cr.Run()
        if err != nil {