16347: Add test case. Flush keepstore logs in CommitLogs.
[arvados.git] / lib / crunchrun / crunchrun.go
index d70dd1c42855e65d779a48fa66e6ef4711050d59..ce060151dc133f232ea9bac237bc59de49612e21 100644 (file)
@@ -137,6 +137,8 @@ type ContainerRunner struct {
        finalState    string
        parentTemp    string
 
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -1277,6 +1279,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1285,6 +1297,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1647,6 +1660,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1702,9 +1716,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        var conf ConfigData
        if *stdinConfig {
-               err := json.NewDecoder(os.Stdin).Decode(&conf)
+               err := json.NewDecoder(stdin).Decode(&conf)
                if err != nil {
-                       log.Print(err)
+                       log.Printf("decode stdin: %s", err)
                        return 1
                }
                for k, v := range conf.Env {
@@ -1730,8 +1744,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLog bufThenWrite
-       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+       var keepstoreLogbuf bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
                return 1
@@ -1747,9 +1761,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        api.Retries = 8
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1767,11 +1781,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        log.Print(err)
                        return 1
                }
-               err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+               cr.keepstoreLogger = NewThrottledLogger(w)
+               err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
                if err != nil {
                        log.Print(err)
                        return 1
                }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
        }
 
        switch *runtimeEngine {
@@ -1895,13 +1911,20 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
                return nil, err
        }
        cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
        cmd.Stdin = &confJSON
        cmd.Stdout = logbuf
        cmd.Stderr = logbuf
-       cmd.Env = []string{
+       cmd.Env = append(cmd.Env,
                "GOGC=10",
-               "ARVADOS_SERVICE_INTERNAL_URL=" + url,
-       }
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
        err = cmd.Start()
        if err != nil {
                return nil, fmt.Errorf("error starting keepstore process: %w", err)