LogsPDH *string
RunArvMount
MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- CleanupTempDir []string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- finalState string
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ finalState string
+ parentTemp string
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
go func(sig chan os.Signal) {
for s := range sig {
- runner.CrunchLog.Printf("caught signal: %v", s)
- runner.stop()
+ runner.stop(s)
}
}(runner.SigChan)
}
// stop the underlying Docker container.
-func (runner *ContainerRunner) stop() {
+func (runner *ContainerRunner) stop(sig os.Signal) {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
+ if sig != nil {
+ runner.CrunchLog.Printf("caught signal: %v", sig)
+ }
if runner.ContainerID == "" {
return
}
}
}
-func (runner *ContainerRunner) stopSignals() {
- if runner.SigChan != nil {
- signal.Stop(runner.SigChan)
- }
-}
-
var errorBlacklist = []string{
"(?ms).*[Cc]annot connect to the Docker daemon.*",
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
if runner.ArvMountPoint == "" {
- runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+ runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
}
return
}
case mnt.Kind == "tmp":
var tmpdir string
- tmpdir, err = runner.MkTempDir("", "")
+ tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
}
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
if bind == runner.Container.OutputPath {
runner.HostOutputDir = tmpdir
// can ensure the file is world-readable
// inside the container, without having to
// make it world-readable on the docker host.
- tmpdir, err := runner.MkTempDir("", "")
+ tmpdir, err := runner.MkTempDir(runner.parentTemp, "json")
if err != nil {
return fmt.Errorf("creating temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
tmpfn := filepath.Join(tmpdir, "mountdata.json")
err = ioutil.WriteFile(tmpfn, jsondata, 0644)
if err != nil {
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
case mnt.Kind == "git_tree":
- tmpdir, err := runner.MkTempDir("", "")
+ tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
if err != nil {
return fmt.Errorf("creating temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
if err != nil {
return err
}
for _, cp := range copyFiles {
- dir, err := os.Stat(cp.src)
+ st, err := os.Stat(cp.src)
if err != nil {
return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
}
- if dir.IsDir() {
+ if st.IsDir() {
err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
if walkerr != nil {
return walkerr
}
+ target := path.Join(cp.bind, walkpath[len(cp.src):])
if walkinfo.Mode().IsRegular() {
- return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+ copyerr := copyfile(walkpath, target)
+ if copyerr != nil {
+ return copyerr
+ }
+ return os.Chmod(target, walkinfo.Mode()|0777)
} else if walkinfo.Mode().IsDir() {
- return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0777)
+ mkerr := os.MkdirAll(target, 0777)
+ if mkerr != nil {
+ return mkerr
+ }
+ return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
} else {
return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
}
})
- } else {
+ } else if st.Mode().IsRegular() {
err = copyfile(cp.src, cp.bind)
+ if err == nil {
+ err = os.Chmod(cp.bind, st.Mode()|0777)
+ }
}
if err != nil {
return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
_, err := io.Copy(response.Conn, stdinRdr)
if err != nil {
runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
- runner.stop()
+ runner.stop(nil)
}
stdinRdr.Close()
response.CloseWrite()
_, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
if err != nil {
runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
- runner.stop()
+ runner.stop(nil)
}
response.CloseWrite()
}()
runner.ContainerConfig.Volumes = runner.Volumes
+ maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
runner.HostConfig = dockercontainer.HostConfig{
Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
},
Resources: dockercontainer.Resources{
CgroupParent: runner.setCgroupParent,
+ NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
+ Memory: maxRAM, // RAM
+ MemorySwap: maxRAM, // RAM+swap
+ KernelMemory: maxRAM, // kernel portion
},
}
case <-arvMountExit:
runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
- runner.stop()
+ runner.stop(nil)
// arvMountExit will always be ready now that
// it's closed, but that doesn't interest us.
arvMountExit = nil
}
}
- for _, tmpdir := range runner.CleanupTempDir {
- if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
- runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
- }
+ if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
}
}
// CommitLogs posts the collection containing the final container logs.
func (runner *ContainerRunner) CommitLogs() error {
- runner.CrunchLog.Print(runner.finalState)
+ func() {
+ // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
+ runner.cStateLock.Lock()
+ defer runner.cStateLock.Unlock()
- if runner.arvMountLog != nil {
- runner.arvMountLog.Close()
- }
- runner.CrunchLog.Close()
+ runner.CrunchLog.Print(runner.finalState)
+
+ if runner.arvMountLog != nil {
+ runner.arvMountLog.Close()
+ }
+ runner.CrunchLog.Close()
- // Closing CrunchLog above allows them to be committed to Keep at this
- // point, but re-open crunch log with ArvClient in case there are any
- // other further errors (such as failing to write the log to Keep!)
- // while shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
- UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
- runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+ // Closing CrunchLog above allows them to be committed to Keep at this
+ // point, but re-open crunch log with ArvClient in case there are any
+ // other further errors (such as failing to write the log to Keep!)
+ // while shutting down
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+ }()
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
runner.finalState = "Queued"
defer func() {
- runner.stopSignals()
runner.CleanupDirs()
runner.CrunchLog.Printf("crunch-run finished")
os.Exit(1)
}
+ parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+ if tmperr != nil {
+ log.Fatalf("%s: %v", containerId, tmperr)
+ }
+
+ cr.parentTemp = parentTemp
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent