13111: Merge branch 'master' into 13111-webdav-projects
[arvados.git] / services / crunch-run / crunchrun.go
index 0582e5418fd4776e11a97224d42c58750b90d8da..53815cbe1c8222d4e6c9614ce889d649224af7e1 100644 (file)
@@ -103,16 +103,18 @@ type ContainerRunner struct {
        LogsPDH       *string
        RunArvMount
        MkTempDir
-       ArvMount       *exec.Cmd
-       ArvMountPoint  string
-       HostOutputDir  string
-       CleanupTempDir []string
-       Binds          []string
-       Volumes        map[string]struct{}
-       OutputPDH      *string
-       SigChan        chan os.Signal
-       ArvMountExit   chan error
-       finalState     string
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       SecretMounts  map[string]arvados.Mount
+       MkArvClient   func(token string) (IArvadosClient, error)
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -152,16 +154,18 @@ func (runner *ContainerRunner) setupSignals() {
 
        go func(sig chan os.Signal) {
                for s := range sig {
-                       runner.CrunchLog.Printf("caught signal: %v", s)
-                       runner.stop()
+                       runner.stop(s)
                }
        }(runner.SigChan)
 }
 
 // stop the underlying Docker container.
-func (runner *ContainerRunner) stop() {
+func (runner *ContainerRunner) stop(sig os.Signal) {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
+       if sig != nil {
+               runner.CrunchLog.Printf("caught signal: %v", sig)
+       }
        if runner.ContainerID == "" {
                return
        }
@@ -173,12 +177,6 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
-func (runner *ContainerRunner) stopSignals() {
-       if runner.SigChan != nil {
-               signal.Stop(runner.SigChan)
-       }
-}
-
 var errorBlacklist = []string{
        "(?ms).*[Cc]annot connect to the Docker daemon.*",
        "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
@@ -327,7 +325,7 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
-               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+               runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
        }
        return
 }
@@ -338,7 +336,7 @@ func copyfile(src string, dst string) (err error) {
                return
        }
 
-       os.MkdirAll(path.Dir(dst), 0770)
+       os.MkdirAll(path.Dir(dst), 0777)
 
        dstfile, err := os.Create(dst)
        if err != nil {
@@ -400,10 +398,24 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        for bind := range runner.Container.Mounts {
                binds = append(binds, bind)
        }
+       for bind := range runner.SecretMounts {
+               if _, ok := runner.Container.Mounts[bind]; ok {
+                       return fmt.Errorf("Secret mount %q conflicts with regular mount", bind)
+               }
+               if runner.SecretMounts[bind].Kind != "json" &&
+                       runner.SecretMounts[bind].Kind != "text" {
+                       return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.",
+                               bind, runner.SecretMounts[bind].Kind)
+               }
+               binds = append(binds, bind)
+       }
        sort.Strings(binds)
 
        for _, bind := range binds {
-               mnt := runner.Container.Mounts[bind]
+               mnt, ok := runner.Container.Mounts[bind]
+               if !ok {
+                       mnt = runner.SecretMounts[bind]
+               }
                if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
@@ -432,8 +444,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
-                       if mnt.Kind != "collection" {
-                               return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
+                       if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
+                               return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
                        }
                }
 
@@ -490,7 +502,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                case mnt.Kind == "tmp":
                        var tmpdir string
-                       tmpdir, err = runner.MkTempDir("", "")
+                       tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
@@ -502,40 +514,46 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
                        }
 
-               case mnt.Kind == "json":
-                       jsondata, err := json.Marshal(mnt.Content)
-                       if err != nil {
-                               return fmt.Errorf("encoding json data: %v", err)
+               case mnt.Kind == "json" || mnt.Kind == "text":
+                       var filedata []byte
+                       if mnt.Kind == "json" {
+                               filedata, err = json.Marshal(mnt.Content)
+                               if err != nil {
+                                       return fmt.Errorf("encoding json data: %v", err)
+                               }
+                       } else {
+                               text, ok := mnt.Content.(string)
+                               if !ok {
+                                       return fmt.Errorf("content for mount %q must be a string", bind)
+                               }
+                               filedata = []byte(text)
                        }
-                       // Create a tempdir with a single file
-                       // (instead of just a tempfile): this way we
-                       // can ensure the file is world-readable
-                       // inside the container, without having to
-                       // make it world-readable on the docker host.
-                       tmpdir, err := runner.MkTempDir("", "")
+
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
-                       tmpfn := filepath.Join(tmpdir, "mountdata.json")
-                       err = ioutil.WriteFile(tmpfn, jsondata, 0644)
+                       tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
+                       err = ioutil.WriteFile(tmpfn, filedata, 0444)
                        if err != nil {
                                return fmt.Errorf("writing temp file: %v", err)
                        }
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+                       if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                               copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                       } else {
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+                       }
 
                case mnt.Kind == "git_tree":
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
                        if err != nil {
                                return err
@@ -578,25 +596,37 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
 
        for _, cp := range copyFiles {
-               dir, err := os.Stat(cp.src)
+               st, err := os.Stat(cp.src)
                if err != nil {
                        return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
-               if dir.IsDir() {
+               if st.IsDir() {
                        err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
                                if walkerr != nil {
                                        return walkerr
                                }
+                               target := path.Join(cp.bind, walkpath[len(cp.src):])
                                if walkinfo.Mode().IsRegular() {
-                                       return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+                                       copyerr := copyfile(walkpath, target)
+                                       if copyerr != nil {
+                                               return copyerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|0777)
                                } else if walkinfo.Mode().IsDir() {
-                                       return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0770)
+                                       mkerr := os.MkdirAll(target, 0777)
+                                       if mkerr != nil {
+                                               return mkerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
                                } else {
                                        return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
                                }
                        })
-               } else {
+               } else if st.Mode().IsRegular() {
                        err = copyfile(cp.src, cp.bind)
+                       if err == nil {
+                               err = os.Chmod(cp.bind, st.Mode()|0777)
+                       }
                }
                if err != nil {
                        return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
@@ -882,7 +912,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        _, err := io.Copy(response.Conn, stdinRdr)
                        if err != nil {
                                runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
-                               runner.stop()
+                               runner.stop(nil)
                        }
                        stdinRdr.Close()
                        response.CloseWrite()
@@ -892,7 +922,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
                        if err != nil {
                                runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
-                               runner.stop()
+                               runner.stop(nil)
                        }
                        response.CloseWrite()
                }()
@@ -943,6 +973,7 @@ func (runner *ContainerRunner) CreateContainer() error {
 
        runner.ContainerConfig.Volumes = runner.Volumes
 
+       maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
        runner.HostConfig = dockercontainer.HostConfig{
                Binds: runner.Binds,
                LogConfig: dockercontainer.LogConfig{
@@ -950,6 +981,10 @@ func (runner *ContainerRunner) CreateContainer() error {
                },
                Resources: dockercontainer.Resources{
                        CgroupParent: runner.setCgroupParent,
+                       NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
+                       Memory:       maxRAM, // RAM
+                       MemorySwap:   maxRAM, // RAM+swap
+                       KernelMemory: maxRAM, // kernel portion
                },
        }
 
@@ -1032,7 +1067,7 @@ func (runner *ContainerRunner) WaitFinish() error {
 
                case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
-                       runner.stop()
+                       runner.stop(nil)
                        // arvMountExit will always be ready now that
                        // it's closed, but that doesn't interest us.
                        arvMountExit = nil
@@ -1249,6 +1284,16 @@ func (runner *ContainerRunner) CaptureOutput() error {
        }
        sort.Strings(binds)
 
+       // Delete secret mounts so they don't get saved to the output collection.
+       for bind := range runner.SecretMounts {
+               if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                       err = os.Remove(runner.HostOutputDir + bind[len(runner.Container.OutputPath):])
+                       if err != nil {
+                               return fmt.Errorf("Unable to remove secret mount: %v", err)
+                       }
+               }
+       }
+
        var manifestText string
 
        collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
@@ -1427,29 +1472,33 @@ func (runner *ContainerRunner) CleanupDirs() {
                }
        }
 
-       for _, tmpdir := range runner.CleanupTempDir {
-               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
-                       runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
-               }
+       if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+               runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
        }
 }
 
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
-       runner.CrunchLog.Print(runner.finalState)
+       func() {
+               // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
+               runner.cStateLock.Lock()
+               defer runner.cStateLock.Unlock()
 
-       if runner.arvMountLog != nil {
-               runner.arvMountLog.Close()
-       }
-       runner.CrunchLog.Close()
+               runner.CrunchLog.Print(runner.finalState)
 
-       // Closing CrunchLog above allows them to be committed to Keep at this
-       // point, but re-open crunch log with ArvClient in case there are any
-       // other further errors (such as failing to write the log to Keep!)
-       // while shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
-               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
-       runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+               if runner.arvMountLog != nil {
+                       runner.arvMountLog.Close()
+               }
+               runner.CrunchLog.Close()
+
+               // Closing CrunchLog above allows them to be committed to Keep at this
+               // point, but re-open crunch log with ArvClient in case there are any
+               // other further errors (such as failing to write the log to Keep!)
+               // while shutting down
+               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+                       UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+               runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+       }()
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -1558,7 +1607,6 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.finalState = "Queued"
 
        defer func() {
-               runner.stopSignals()
                runner.CleanupDirs()
 
                runner.CrunchLog.Printf("crunch-run finished")
@@ -1689,6 +1737,31 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
        if err != nil {
                return fmt.Errorf("error decoding container record: %v", err)
        }
+
+       var sm struct {
+               SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
+       }
+
+       containerToken, err := runner.ContainerToken()
+       if err != nil {
+               return fmt.Errorf("error getting container token: %v", err)
+       }
+
+       containerClient, err := runner.MkArvClient(containerToken)
+       if err != nil {
+               return fmt.Errorf("error creating container API client: %v", err)
+       }
+
+       err = containerClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
+       if err != nil {
+               if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
+                       return fmt.Errorf("error fetching secret_mounts: %v", err)
+               }
+               // ok && apierr.HttpStatusCode == 404, which means
+               // secret_mounts isn't supported by this API server.
+       }
+       runner.SecretMounts = sm.SecretMounts
+
        return nil
 }
 
@@ -1702,6 +1775,14 @@ func NewContainerRunner(api IArvadosClient,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
+       cr.MkArvClient = func(token string) (IArvadosClient, error) {
+               cl, err := arvadosclient.MakeArvadosClient()
+               if err != nil {
+                       return nil, err
+               }
+               cl.ApiToken = token
+               return cl, nil
+       }
        cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
@@ -1769,6 +1850,12 @@ func main() {
                os.Exit(1)
        }
 
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       if tmperr != nil {
+               log.Fatalf("%s: %v", containerId, tmperr)
+       }
+
+       cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent