16347: Add test case. Flush keepstore logs in CommitLogs.
authorTom Clegg <tom@curii.com>
Thu, 7 Oct 2021 20:22:54 +0000 (16:22 -0400)
committerTom Clegg <tom@curii.com>
Fri, 8 Oct 2021 15:21:03 +0000 (11:21 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/integration_test.go

index d70dd1c42855e65d779a48fa66e6ef4711050d59..ce060151dc133f232ea9bac237bc59de49612e21 100644 (file)
@@ -137,6 +137,8 @@ type ContainerRunner struct {
        finalState    string
        parentTemp    string
 
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -1277,6 +1279,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1285,6 +1297,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1647,6 +1660,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1702,9 +1716,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        var conf ConfigData
        if *stdinConfig {
-               err := json.NewDecoder(os.Stdin).Decode(&conf)
+               err := json.NewDecoder(stdin).Decode(&conf)
                if err != nil {
-                       log.Print(err)
+                       log.Printf("decode stdin: %s", err)
                        return 1
                }
                for k, v := range conf.Env {
@@ -1730,8 +1744,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLog bufThenWrite
-       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+       var keepstoreLogbuf bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
                return 1
@@ -1747,9 +1761,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        api.Retries = 8
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1767,11 +1781,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        log.Print(err)
                        return 1
                }
-               err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+               cr.keepstoreLogger = NewThrottledLogger(w)
+               err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
                if err != nil {
                        log.Print(err)
                        return 1
                }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
        }
 
        switch *runtimeEngine {
@@ -1895,13 +1911,20 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
                return nil, err
        }
        cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
        cmd.Stdin = &confJSON
        cmd.Stdout = logbuf
        cmd.Stderr = logbuf
-       cmd.Env = []string{
+       cmd.Env = append(cmd.Env,
                "GOGC=10",
-               "ARVADOS_SERVICE_INTERNAL_URL=" + url,
-       }
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
        err = cmd.Start()
        if err != nil {
                return nil, fmt.Errorf("error starting keepstore process: %w", err)
index c688248c64fbdc26330144ebeafb3abf8c42edc6..597490c5788d6a3ddf0e351023d81f8413baef62 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
@@ -13,9 +14,11 @@ import (
        "os/exec"
        "strings"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        . "gopkg.in/check.v1"
 )
@@ -33,6 +36,9 @@ type integrationSuite struct {
        client *arvados.Client
        ac     *arvadosclient.ArvadosClient
        kc     *keepclient.KeepClient
+
+       logCollection    arvados.Collection
+       outputCollection arvados.Collection
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -49,7 +55,12 @@ func (s *integrationSuite) SetUpSuite(c *C) {
        out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
        imageUUID := strings.TrimSpace(string(out))
        c.Logf("image uuid %s", imageUUID)
-       c.Assert(err, IsNil)
+       if !c.Check(err, IsNil) {
+               if err, ok := err.(*exec.ExitError); ok {
+                       c.Logf("%s", err.Stderr)
+               }
+               c.Fail()
+       }
        err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
        c.Assert(err, IsNil)
        c.Logf("image pdh %s", s.image.PortableDataHash)
@@ -76,6 +87,9 @@ func (s *integrationSuite) SetUpSuite(c *C) {
        })
        c.Assert(err, IsNil)
        c.Logf("input pdh %s", s.input.PortableDataHash)
+
+       s.logCollection = arvados.Collection{}
+       s.outputCollection = arvados.Collection{}
 }
 
 func (s *integrationSuite) TearDownSuite(c *C) {
@@ -150,17 +164,56 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
        s.testRunTrivialContainer(c)
 }
 
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, IsNil)
+       for uuid, volume := range cluster.Volumes {
+               volume.AccessViaHosts = nil
+               volume.Replication = 2
+               cluster.Volumes[uuid] = volume
+       }
+
+       s.stdin.Reset()
+       err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+               Env:         nil,
+               KeepBuffers: 1,
+               Cluster:     cluster,
+       })
+       c.Assert(err, IsNil)
+
+       s.engine = "docker"
+       s.testRunTrivialContainer(c)
+
+       fs, err := s.logCollection.FileSystem(s.client, s.kc)
+       c.Assert(err, IsNil)
+       f, err := fs.Open("keepstore.txt")
+       c.Assert(err, IsNil)
+       buf, err := ioutil.ReadAll(f)
+       c.Assert(err, IsNil)
+       c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
+       c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+}
+
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
        if err := exec.Command("which", s.engine).Run(); err != nil {
                c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
        }
        s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
        s.setup(c)
-       code := command{}.RunCommand("crunch-run", []string{
+
+       args := []string{
                "-runtime-engine=" + s.engine,
                "-enable-memory-limit=false",
                s.cr.ContainerUUID,
-       }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       }
+       if s.stdin.Len() > 0 {
+               args = append([]string{"-stdin-config=true"}, args...)
+       }
+       code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+       c.Logf("\n===== stderr =====\n%s", s.stderr.String())
        c.Check(code, Equals, 0)
        err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
        c.Assert(err, IsNil)
@@ -185,6 +238,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
                }
        }
+       s.logCollection = log
 
        var output arvados.Collection
        err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
@@ -218,4 +272,5 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Check(fi.Name(), Equals, ".keep")
                }
        }
+       s.outputCollection = output
 }