Merge branch '10538-trash-delete' closes #10538
[arvados.git] / services / crunch-run / crunchrun.go
index 060a4625cf810f89c3f100433ce88aa5f45ba2cc..971cb3a27a246c9fbe1a325496a6da63e6e69ac4 100644 (file)
@@ -29,8 +29,9 @@ import (
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
        Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
-       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
-       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+       Discovery(key string) (interface{}, error)
 }
 
 // ErrCancelled is the error returned when the container is cancelled.
@@ -93,12 +94,25 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+       trashLifetime  time.Duration
 
        statLogger   io.WriteCloser
        statReporter *crunchstat.Reporter
        statInterval time.Duration
        cgroupRoot   string
-       cgroupParent string
+       // What we expect the container's cgroup parent to be.
+       expectCgroupParent string
+       // What we tell docker to use as the container's cgroup
+       // parent. Note: Ideally we would use the same field for both
+       // expectCgroupParent and setCgroupParent, and just make it
+       // default to "docker". However, when using docker < 1.10 with
+       // systemd, specifying a non-empty cgroup parent (even the
+       // default value "docker") hits a docker bug
+       // (https://github.com/docker/docker/issues/17126). Using two
+       // separate fields makes it possible to use the "expect cgroup
+       // parent to be X" feature even on sites where the "specify
+       // cgroup parent" feature breaks.
+       setCgroupParent string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -236,8 +250,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+
+       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       }
+
        collectionPaths := []string{}
        runner.Binds = nil
+       needCertMount := true
 
        for bind, mnt := range runner.Container.Mounts {
                if bind == "stdout" {
@@ -255,6 +275,9 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
                        }
                }
+               if bind == "/etc/arvados/ca-certificates.crt" {
+                       needCertMount = false
+               }
 
                switch {
                case mnt.Kind == "collection":
@@ -336,6 +359,16 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("Output path does not correspond to a writable mount point")
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
+               for _, certfile := range arvadosclient.CertFiles {
+                       _, err := os.Stat(certfile)
+                       if err == nil {
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
+                               break
+                       }
+               }
+       }
+
        if pdhOnly {
                arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
        } else {
@@ -417,7 +450,7 @@ func (runner *ContainerRunner) StartCrunchstat() {
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
                Logger:       log.New(runner.statLogger, "", 0),
-               CgroupParent: runner.cgroupParent,
+               CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
                PollPeriod:   runner.statInterval,
        }
@@ -504,8 +537,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                return fmt.Errorf("While creating container: %v", err)
        }
 
-       runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
-               LogConfig: dockerclient.LogConfig{Type: "none"}}
+       runner.HostConfig = dockerclient.HostConfig{
+               Binds:        runner.Binds,
+               CgroupParent: runner.setCgroupParent,
+               LogConfig: dockerclient.LogConfig{
+                       Type: "none",
+               },
+       }
 
        return runner.AttachStreams()
 }
@@ -544,6 +582,21 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return nil
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+               // Output may have been set directly by the container, so
+               // refresh the container record to check.
+               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+                       nil, &runner.Container)
+               if err != nil {
+                       return err
+               }
+               if runner.Container.Output != "" {
+                       // Container output is already set.
+                       runner.OutputPDH = &runner.Container.Output
+                       return nil
+               }
+       }
+
        if runner.HostOutputDir == "" {
                return nil
        }
@@ -584,18 +637,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating output collection: %v", err)
        }
-
-       runner.OutputPDH = new(string)
-       *runner.OutputPDH = response.PortableDataHash
-
+       runner.OutputPDH = &response.PortableDataHash
        return nil
 }
 
+func (runner *ContainerRunner) loadDiscoveryVars() {
+       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
+       if err != nil {
+               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       }
+       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+}
+
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
@@ -648,15 +708,14 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating log collection: %v", err)
        }
-
        runner.LogsPDH = &response.PortableDataHash
-
        return nil
 }
 
@@ -692,10 +751,10 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
 func (runner *ContainerRunner) UpdateContainerFinal() error {
        update := arvadosclient.Dict{}
        update["state"] = runner.finalState
+       if runner.LogsPDH != nil {
+               update["log"] = *runner.LogsPDH
+       }
        if runner.finalState == "Complete" {
-               if runner.LogsPDH != nil {
-                       update["log"] = *runner.LogsPDH
-               }
                if runner.ExitCode != nil {
                        update["exit_code"] = *runner.ExitCode
                }
@@ -755,6 +814,7 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(err)
 
                if runner.finalState == "Queued" {
+                       runner.CrunchLog.Close()
                        runner.UpdateContainerFinal()
                        return
                }
@@ -787,6 +847,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -794,6 +855,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // set up FUSE mount and binds
        err = runner.SetupMounts()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While setting up mounts: %v", err)
                return
        }
@@ -841,17 +903,24 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.loadDiscoveryVars()
        return cr
 }
 
 func main() {
        statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
+       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+       cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
        flag.Parse()
 
        containerId := flag.Arg(0)
 
+       if *caCertsPath != "" {
+               arvadosclient.CertFiles = []string{*caCertsPath}
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
@@ -859,7 +928,7 @@ func main() {
        api.Retries = 8
 
        var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(&api)
+       kc, err = keepclient.MakeKeepClient(api)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
@@ -874,7 +943,12 @@ func main() {
        cr := NewContainerRunner(api, kc, docker, containerId)
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
-       cr.cgroupParent = *cgroupParent
+       cr.expectCgroupParent = *cgroupParent
+       if *cgroupParentSubsystem != "" {
+               p := findCgroup(*cgroupParentSubsystem)
+               cr.setCgroupParent = p
+               cr.expectCgroupParent = p
+       }
 
        err = cr.Run()
        if err != nil {