10484: Track non-s3 errors by Go type.
[arvados.git] / services / crunch-run / crunchrun.go
index 40e9fc11eac528965f978f4b0472249759b42cbe..ade40c6b03a4d4a98812172aab31da5173453c4e 100644 (file)
@@ -18,6 +18,7 @@ import (
        "os/exec"
        "os/signal"
        "path"
+       "path/filepath"
        "strings"
        "sync"
        "syscall"
@@ -28,8 +29,9 @@ import (
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
        Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
-       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
-       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+       Discovery(key string) (interface{}, error)
 }
 
 // ErrCancelled is the error returned when the container is cancelled.
@@ -92,13 +94,25 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+       trashLifetime  time.Duration
 
-       statLogger         io.WriteCloser
-       statReporter       *crunchstat.Reporter
-       statInterval       time.Duration
-       cgroupRoot         string
+       statLogger   io.WriteCloser
+       statReporter *crunchstat.Reporter
+       statInterval time.Duration
+       cgroupRoot   string
+       // What we expect the container's cgroup parent to be.
        expectCgroupParent string
-       setCgroupParent    string
+       // What we tell docker to use as the container's cgroup
+       // parent. Note: Ideally we would use the same field for both
+       // expectCgroupParent and setCgroupParent, and just make it
+       // default to "docker". However, when using docker < 1.10 with
+       // systemd, specifying a non-empty cgroup parent (even the
+       // default value "docker") hits a docker bug
+       // (https://github.com/docker/docker/issues/17126). Using two
+       // separate fields makes it possible to use the "expect cgroup
+       // parent to be X" feature even on sites where the "specify
+       // cgroup parent" feature breaks.
+       setCgroupParent string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -236,6 +250,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+
+       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       }
+
        collectionPaths := []string{}
        runner.Binds = nil
 
@@ -256,7 +275,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
 
-               if mnt.Kind == "collection" {
+               switch {
+               case mnt.Kind == "collection":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -287,25 +307,47 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
                        collectionPaths = append(collectionPaths, src)
-               } else if mnt.Kind == "tmp" {
-                       if bind == runner.Container.OutputPath {
-                               runner.HostOutputDir, err = runner.MkTempDir("", "")
-                               if err != nil {
-                                       return fmt.Errorf("While creating mount temp dir: %v", err)
-                               }
-                               st, staterr := os.Stat(runner.HostOutputDir)
-                               if staterr != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", staterr)
-                               }
-                               err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
-                               if staterr != nil {
-                                       return fmt.Errorf("While Chmod temp dir: %v", err)
-                               }
-                               runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-                       } else {
-                               runner.Binds = append(runner.Binds, bind)
+
+               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
+                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("While creating mount temp dir: %v", err)
+                       }
+                       st, staterr := os.Stat(runner.HostOutputDir)
+                       if staterr != nil {
+                               return fmt.Errorf("While Stat on temp dir: %v", staterr)
+                       }
+                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       if staterr != nil {
+                               return fmt.Errorf("While Chmod temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
+
+               case mnt.Kind == "tmp":
+                       runner.Binds = append(runner.Binds, bind)
+
+               case mnt.Kind == "json":
+                       jsondata, err := json.Marshal(mnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("encoding json data: %v", err)
+                       }
+                       // Create a tempdir with a single file
+                       // (instead of just a tempfile): this way we
+                       // can ensure the file is world-readable
+                       // inside the container, without having to
+                       // make it world-readable on the docker host.
+                       tmpdir, err := runner.MkTempDir("", "")
+                       if err != nil {
+                               return fmt.Errorf("creating temp dir: %v", err)
+                       }
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       tmpfn := filepath.Join(tmpdir, "mountdata.json")
+                       err = ioutil.WriteFile(tmpfn, jsondata, 0644)
+                       if err != nil {
+                               return fmt.Errorf("writing temp file: %v", err)
                        }
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
                }
        }
 
@@ -526,6 +568,21 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return nil
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+               // Output may have been set directly by the container, so
+               // refresh the container record to check.
+               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+                       nil, &runner.Container)
+               if err != nil {
+                       return err
+               }
+               if runner.Container.Output != "" {
+                       // Container output is already set.
+                       runner.OutputPDH = &runner.Container.Output
+                       return nil
+               }
+       }
+
        if runner.HostOutputDir == "" {
                return nil
        }
@@ -566,18 +623,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating output collection: %v", err)
        }
-
-       runner.OutputPDH = new(string)
-       *runner.OutputPDH = response.PortableDataHash
-
+       runner.OutputPDH = &response.PortableDataHash
        return nil
 }
 
+func (runner *ContainerRunner) loadDiscoveryVars() {
+       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
+       if err != nil {
+               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       }
+       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+}
+
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
@@ -630,15 +694,14 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating log collection: %v", err)
        }
-
        runner.LogsPDH = &response.PortableDataHash
-
        return nil
 }
 
@@ -823,6 +886,7 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.loadDiscoveryVars()
        return cr
 }
 
@@ -842,7 +906,7 @@ func main() {
        api.Retries = 8
 
        var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(&api)
+       kc, err = keepclient.MakeKeepClient(api)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }