"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
+ arvMountLog *ThrottledLogger
}
// setupSignals sets up signal handling to gracefully terminate the underlying
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- <-sig
+ s := <-sig
+ if s != nil {
+ runner.CrunchLog.Printf("Caught signal %v", s)
+ }
runner.stop()
}(runner.SigChan)
}
timeout := time.Duration(10)
err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
if err != nil {
- log.Printf("StopContainer failed: %s", err)
+ runner.CrunchLog.Printf("StopContainer failed: %s", err)
}
+ // Suppress multiple calls to stop()
+ runner.cStarted = false
}
}
-func (runner *ContainerRunner) teardown() {
+func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
close(runner.SigChan)
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
- c.Stdout = nt
- c.Stderr = nt
+ runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+ c.Stdout = runner.arvMountLog
+ c.Stderr = runner.arvMountLog
+
+ runner.CrunchLog.Printf("Running %v", c.Args)
err = c.Start()
if err != nil {
}()
go func() {
- runner.ArvMountExit <- c.Wait()
+ mnterr := c.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ }
+ runner.ArvMountExit <- mnterr
close(runner.ArvMountExit)
}()
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
pdhOnly := true
tmpcount := 0
- arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+ arvMountCmd := []string{
+ "--foreground",
+ "--allow-other",
+ "--read-write",
+ fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ go func() {
+ <-runner.ArvMountExit
+ if runner.cStarted {
+ runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
+ runner.stop()
+ }
+ }()
+
var waitBody dockercontainer.ContainerWaitOKBody
select {
case waitBody = <-waitOk:
case err = <-waitErr:
}
+ // Container isn't running any more
+ runner.cStarted = false
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
code := int(waitBody.StatusCode)
runner.ExitCode = &code
- waitMount := runner.ArvMountExit
- select {
- case err = <-waitMount:
- runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
- waitMount = nil
- runner.stop()
- default:
- }
-
// wait for stdout/stderr to complete
<-runner.loggingDone
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
- umnterr := umount.Run()
+ var delay int64 = 8
+ umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+ umount.Stdout = runner.CrunchLog
+ umount.Stderr = runner.CrunchLog
+ runner.CrunchLog.Printf("Running %v", umount.Args)
+ umnterr := umount.Start()
+
if umnterr != nil {
- runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+ runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ } else {
+ // If arv-mount --unmount gets stuck for any reason, we
+ // don't want to wait for it forever. Do Wait() in a goroutine
+ // so it doesn't block crunch-run.
+ umountExit := make(chan error)
+ go func() {
+ mnterr := umount.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
+ }
+ umountExit <- mnterr
+ }()
+
+ for again := true; again; {
+ again = false
+ select {
+ case <-umountExit:
+ umount = nil
+ again = true
+ case <-runner.ArvMountExit:
+ break
+ case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+ runner.CrunchLog.Printf("Timed out waiting for unmount")
+ if umount != nil {
+ umount.Process.Kill()
+ }
+ runner.ArvMount.Process.Kill()
+ }
+ }
}
+ }
- mnterr := <-runner.ArvMountExit
- if mnterr != nil {
- runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ if runner.ArvMountPoint != "" {
+ if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
}
for _, tmpdir := range runner.CleanupTempDir {
- rmerr := os.RemoveAll(tmpdir)
- if rmerr != nil {
+ if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
}
}
// CommitLogs posts the collection containing the final container logs.
func (runner *ContainerRunner) CommitLogs() error {
runner.CrunchLog.Print(runner.finalState)
+
+ if runner.arvMountLog != nil {
+ runner.arvMountLog.Close()
+ }
runner.CrunchLog.Close()
- // Closing CrunchLog above allows it to be committed to Keep at this
+ // Closing CrunchLog above allows them to be committed to Keep at this
// point, but re-open crunch log with ArvClient in case there are any
- // other further (such as failing to write the log to Keep!) while
- // shutting down
+ // other further errors (such as failing to write the log to Keep!)
+ // while shutting down
runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
- writeCloser: runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
- // Clean up temporary directories _after_ finalizing
- // everything (if we've made any by then)
- defer runner.CleanupDirs()
-
runner.finalState = "Queued"
+ defer func() {
+ runner.stopSignals()
+ runner.CleanupDirs()
+
+ runner.CrunchLog.Printf("crunch-run finished")
+ runner.CrunchLog.Close()
+ }()
+
defer func() {
// checkErr prints e (unless it's nil) and sets err to
// e (unless err is already non-nil). Thus, if err
checkErr(err)
if runner.finalState == "Queued" {
- runner.CrunchLog.Close()
runner.UpdateContainerFinal()
return
}
checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
-
- // The real log is already closed, but then we opened
- // a new one in case we needed to log anything while
- // finalizing.
- runner.CrunchLog.Close()
-
- runner.teardown()
}()
err = runner.fetchContainerRecord()
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("Version: %s\n", arvadosVersion.GetVersion())
+ os.Exit(0)
+ }
+
+ log.Printf("crunch-run %q started", arvadosVersion.GetVersion())
+
containerId := flag.Arg(0)
if *caCertsPath != "" {