11170: Treat the squeue/scancel calls as files instead of treating them as processes...
[arvados.git] / services / crunch-run / crunchrun.go
index 0b59f7df91d12781b80659db7ea0be3c162a2703..3b3cdf1c14a872dacbc76b5679db9970609907dd 100644 (file)
@@ -5,12 +5,6 @@ import (
        "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/lib/crunchstat"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
        "io"
        "io/ioutil"
        "log"
@@ -24,6 +18,13 @@ import (
        "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/lib/crunchstat"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "github.com/curoverse/dockerclient"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -123,20 +124,29 @@ func (runner *ContainerRunner) SetupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGINT)
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
-       go func(sig <-chan os.Signal) {
-               for range sig {
-                       if !runner.Cancelled {
-                               runner.CancelLock.Lock()
-                               runner.Cancelled = true
-                               if runner.ContainerID != "" {
-                                       runner.Docker.StopContainer(runner.ContainerID, 10)
-                               }
-                               runner.CancelLock.Unlock()
-                       }
-               }
+       go func(sig chan os.Signal) {
+               <-sig
+               runner.stop()
+               signal.Stop(sig)
        }(runner.SigChan)
 }
 
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+       runner.CancelLock.Lock()
+       defer runner.CancelLock.Unlock()
+       if runner.Cancelled {
+               return
+       }
+       runner.Cancelled = true
+       if runner.ContainerID != "" {
+               err := runner.Docker.StopContainer(runner.ContainerID, 10)
+               if err != nil {
+                       log.Printf("StopContainer failed: %s", err)
+               }
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -600,12 +610,22 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       result := runner.Docker.Wait(runner.ContainerID)
-       wr := <-result
-       if wr.Error != nil {
-               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+       waitDocker := runner.Docker.Wait(runner.ContainerID)
+       waitMount := runner.ArvMountExit
+       for waitDocker != nil {
+               select {
+               case err := <-waitMount:
+                       runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+                       waitMount = nil
+                       runner.stop()
+               case wr := <-waitDocker:
+                       if wr.Error != nil {
+                               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+                       }
+                       runner.ExitCode = &wr.ExitCode
+                       waitDocker = nil
+               }
        }
-       runner.ExitCode = &wr.ExitCode
 
        // wait for stdout/stderr to complete
        <-runner.loggingDone
@@ -649,7 +669,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        _, err = os.Stat(collectionMetafile)
        if err != nil {
                // Regular directory
-               cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
                manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
                if err != nil {
                        return fmt.Errorf("While uploading output files: %v", err)
@@ -706,6 +726,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        manifestText = manifest.Extract(".", ".").Text
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
+                       "ensure_unique_name": true,
                        "collection": arvadosclient.Dict{
                                "is_trashed":    true,
                                "name":          "output for " + runner.Container.UUID,
@@ -810,6 +831,7 @@ func (runner *ContainerRunner) CommitLogs() error {
        var response arvados.Collection
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
+                       "ensure_unique_name": true,
                        "collection": arvadosclient.Dict{
                                "is_trashed":    true,
                                "name":          "logs for " + runner.Container.UUID,
@@ -1002,7 +1024,7 @@ func NewContainerRunner(api IArvadosClient,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+       cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)