10777: Close and flush logs right away instead of waiting for next tick.
[arvados.git] / services / crunch-run / crunchrun.go
index 6d8ec9c84eade9ceb2a46fbd9605bf49b308ce41..47099673e2b2133c92e6205401be508e162b94a1 100644 (file)
@@ -5,12 +5,6 @@ import (
        "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/lib/crunchstat"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
        "io"
        "io/ioutil"
        "log"
@@ -24,6 +18,13 @@ import (
        "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/lib/crunchstat"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "github.com/curoverse/dockerclient"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -95,7 +96,6 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
-       trashLifetime  time.Duration
 
        statLogger   io.WriteCloser
        statReporter *crunchstat.Reporter
@@ -124,20 +124,29 @@ func (runner *ContainerRunner) SetupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGINT)
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
-       go func(sig <-chan os.Signal) {
-               for range sig {
-                       if !runner.Cancelled {
-                               runner.CancelLock.Lock()
-                               runner.Cancelled = true
-                               if runner.ContainerID != "" {
-                                       runner.Docker.StopContainer(runner.ContainerID, 10)
-                               }
-                               runner.CancelLock.Unlock()
-                       }
-               }
+       go func(sig chan os.Signal) {
+               <-sig
+               runner.stop()
+               signal.Stop(sig)
        }(runner.SigChan)
 }
 
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+       runner.CancelLock.Lock()
+       defer runner.CancelLock.Unlock()
+       if runner.Cancelled {
+               return
+       }
+       runner.Cancelled = true
+       if runner.ContainerID != "" {
+               err := runner.Docker.StopContainer(runner.ContainerID, 10)
+               if err != nil {
+                       log.Printf("StopContainer failed: %s", err)
+               }
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -317,7 +326,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                if mnt.Writable {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
+                               idx := strings.Index(mnt.PortableDataHash, "/")
+                               if idx > 0 {
+                                       mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
+                                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+                                       runner.Container.Mounts[bind] = mnt
+                               }
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
+                               if mnt.Path != "" && mnt.Path != "." {
+                                       if strings.HasPrefix(mnt.Path, "./") {
+                                               mnt.Path = mnt.Path[2:]
+                                       } else if strings.HasPrefix(mnt.Path, "/") {
+                                               mnt.Path = mnt.Path[1:]
+                                       }
+                                       src += "/" + mnt.Path
+                               }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
                                arvMountCmd = append(arvMountCmd, "--mount-tmp")
@@ -587,12 +610,22 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       result := runner.Docker.Wait(runner.ContainerID)
-       wr := <-result
-       if wr.Error != nil {
-               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+       waitDocker := runner.Docker.Wait(runner.ContainerID)
+       waitMount := runner.ArvMountExit
+       for waitDocker != nil {
+               select {
+               case err := <-waitMount:
+                       runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+                       waitMount = nil
+                       runner.stop()
+               case wr := <-waitDocker:
+                       if wr.Error != nil {
+                               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+                       }
+                       runner.ExitCode = &wr.ExitCode
+                       waitDocker = nil
+               }
        }
-       runner.ExitCode = &wr.ExitCode
 
        // wait for stdout/stderr to complete
        <-runner.loggingDone
@@ -674,20 +707,10 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if strings.HasPrefix(bindSuffix, "/") == false {
-                       bindSuffix = "/" + bindSuffix
-               }
-
                if mnt.ExcludeFromOutput == true {
                        continue
                }
 
-               idx := strings.Index(mnt.PortableDataHash, "/")
-               if idx > 0 {
-                       mnt.Path = mnt.PortableDataHash[idx:]
-                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
-               }
-
                // append to manifest_text
                m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
                if err != nil {
@@ -699,10 +722,12 @@ func (runner *ContainerRunner) CaptureOutput() error {
 
        // Save output
        var response arvados.Collection
+       manifest := manifest.Manifest{Text: manifestText}
+       manifestText = manifest.Extract(".", ".").Text
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
@@ -746,78 +771,12 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
                return "", nil
        }
 
-       manifest := manifest.Manifest{Text: collection.ManifestText}
-       manifestText := manifest.NormalizedManifestForPath(mnt.Path)
-
-       if manifestText == "" {
-               // It could be denormalized manifest
-               mntPath := strings.Trim(mnt.Path, "/")
-               manifestText = strings.Replace(collection.ManifestText, "./", "."+bindSuffix+"/", -1)
-               manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
-               wanted := ""
-               for _, stream := range strings.Split(manifestText, "\n") {
-                       if strings.Index(stream, mntPath) == -1 {
-                               continue
-                       }
-
-                       for _, token := range strings.Split(manifestText, " ") {
-                               if strings.Index(token, ":") == -1 {
-                                       wanted += " " + token
-                               } else if strings.Index(token, ":"+mntPath) >= 0 {
-                                       wanted += " " + token + "\n"
-                                       break
-                               }
-                       }
-               }
-               return wanted, nil
-       }
-
-       if mnt.Path == "" || mnt.Path == "/" {
-               // no path specified; return the entire manifest text after making adjustments
-               manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
-               manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
-       } else {
-               // either a single stream or file from a stream is being sought
-               bindIdx := strings.LastIndex(bindSuffix, "/")
-               var bindSubdir, bindFileName string
-               if bindIdx >= 0 {
-                       bindSubdir = "." + bindSuffix[0:bindIdx]
-                       bindFileName = bindSuffix[bindIdx+1:]
-               }
-               mntPath := mnt.Path
-               if strings.HasSuffix(mntPath, "/") {
-                       mntPath = mntPath[0 : len(mntPath)-1]
-               }
-               pathIdx := strings.LastIndex(mntPath, "/")
-               var pathSubdir, pathFileName string
-               if pathIdx >= 0 {
-                       pathSubdir = "." + mntPath[0:pathIdx]
-                       pathFileName = mntPath[pathIdx+1:]
-               }
-
-               if strings.Index(manifestText, "."+mntPath+" ") != -1 {
-                       // path refers to this complete stream
-                       manifestText = strings.Replace(manifestText, "."+mntPath, "."+bindSuffix, -1)
-               } else {
-                       // look for a matching file in this stream
-                       manifestText = strings.Replace(manifestText, ":"+pathFileName, ":"+bindFileName, -1)
-                       manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
-               }
-       }
-
-       if manifestText == "" {
-               runner.CrunchLog.Printf("No manifest segment found for bind '%v' with path '%v'", bindSuffix, mnt.Path)
-       }
-
-       return manifestText, nil
-}
-
-func (runner *ContainerRunner) loadDiscoveryVars() {
-       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
-       if err != nil {
-               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       mft := manifest.Manifest{Text: collection.ManifestText}
+       extracted := mft.Extract(mnt.Path, bindSuffix)
+       if extracted.Err != nil {
+               return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
        }
-       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+       return extracted.Text, nil
 }
 
 func (runner *ContainerRunner) CleanupDirs() {
@@ -872,7 +831,7 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
@@ -1067,7 +1026,6 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
-       cr.loadDiscoveryVars()
        return cr
 }