From 6b39f96311c832de21bcacc3f17a611682d522a9 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 7 Oct 2021 16:22:54 -0400 Subject: [PATCH] 16347: Add test case. Flush keepstore logs in CommitLogs. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/crunchrun/crunchrun.go | 45 +++++++++++++++++------ lib/crunchrun/integration_test.go | 61 +++++++++++++++++++++++++++++-- 2 files changed, 92 insertions(+), 14 deletions(-) diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index d70dd1c428..ce060151dc 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -137,6 +137,8 @@ type ContainerRunner struct { finalState string parentTemp string + keepstoreLogger io.WriteCloser + keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser statReporter *crunchstat.Reporter hoststatLogger io.WriteCloser @@ -1277,6 +1279,16 @@ func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) }() + if runner.keepstoreLogger != nil { + // Flush any buffered logs from our local keepstore + // process. Discard anything logged after this point + // -- it won't end up in the log collection, so + // there's no point writing it to the collectionfs. + runner.keepstoreLogbuf.SetWriter(io.Discard) + runner.keepstoreLogger.Close() + runner.keepstoreLogger = nil + } + if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, // we must be closing the re-opened log, which won't @@ -1285,6 +1297,7 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } + saved, err := runner.saveLogCollection(true) if err != nil { return fmt.Errorf("error saving log collection: %s", err) @@ -1647,6 +1660,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client, } func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") @@ -1702,9 +1716,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s var conf ConfigData if *stdinConfig { - err := json.NewDecoder(os.Stdin).Decode(&conf) + err := json.NewDecoder(stdin).Decode(&conf) if err != nil { - log.Print(err) + log.Printf("decode stdin: %s", err) return 1 } for k, v := range conf.Env { @@ -1730,8 +1744,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } - var keepstoreLog bufThenWrite - keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr)) + var keepstoreLogbuf bufThenWrite + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) if err != nil { log.Print(err) return 1 @@ -1747,9 +1761,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } api.Retries = 8 - kc, kcerr := keepclient.MakeKeepClient(api) - if kcerr != nil { - log.Printf("%s: %v", containerUUID, kcerr) + kc, err := keepclient.MakeKeepClient(api) + if err != nil { + log.Printf("%s: %v", containerUUID, err) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} @@ -1767,11 +1781,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s log.Print(err) return 1 } - err = keepstoreLog.SetWriter(NewThrottledLogger(w)) + cr.keepstoreLogger = NewThrottledLogger(w) + err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger) if err != nil { log.Print(err) return 1 } + cr.keepstoreLogbuf = &keepstoreLogbuf } switch *runtimeEngine { @@ -1895,13 +1911,20 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er return nil, err } cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") { + // If we're a 'go test' process, running + // /proc/self/exe would start the test suite in a + // child process, which is not what we want. + cmd.Path, _ = exec.LookPath("go") + cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...) + cmd.Env = os.Environ() + } cmd.Stdin = &confJSON cmd.Stdout = logbuf cmd.Stderr = logbuf - cmd.Env = []string{ + cmd.Env = append(cmd.Env, "GOGC=10", - "ARVADOS_SERVICE_INTERNAL_URL=" + url, - } + "ARVADOS_SERVICE_INTERNAL_URL="+url) err = cmd.Start() if err != nil { return nil, fmt.Errorf("error starting keepstore process: %w", err) diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go index c688248c64..597490c578 100644 --- a/lib/crunchrun/integration_test.go +++ b/lib/crunchrun/integration_test.go @@ -6,6 +6,7 @@ package crunchrun import ( "bytes" + "encoding/json" "fmt" "io" "io/ioutil" @@ -13,9 +14,11 @@ import ( "os/exec" "strings" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" . "gopkg.in/check.v1" ) @@ -33,6 +36,9 @@ type integrationSuite struct { client *arvados.Client ac *arvadosclient.ArvadosClient kc *keepclient.KeepClient + + logCollection arvados.Collection + outputCollection arvados.Collection } func (s *integrationSuite) SetUpSuite(c *C) { @@ -49,7 +55,12 @@ func (s *integrationSuite) SetUpSuite(c *C) { out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output() imageUUID := strings.TrimSpace(string(out)) c.Logf("image uuid %s", imageUUID) - c.Assert(err, IsNil) + if !c.Check(err, IsNil) { + if err, ok := err.(*exec.ExitError); ok { + c.Logf("%s", err.Stderr) + } + c.Fail() + } err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil) c.Assert(err, IsNil) c.Logf("image pdh %s", s.image.PortableDataHash) @@ -76,6 +87,9 @@ func (s *integrationSuite) SetUpSuite(c *C) { }) c.Assert(err, IsNil) c.Logf("input pdh %s", s.input.PortableDataHash) + + s.logCollection = arvados.Collection{} + s.outputCollection = arvados.Collection{} } func (s *integrationSuite) TearDownSuite(c *C) { @@ -150,17 +164,56 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) { s.testRunTrivialContainer(c) } +func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) { + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, IsNil) + cluster, err := cfg.GetCluster("") + c.Assert(err, IsNil) + for uuid, volume := range cluster.Volumes { + volume.AccessViaHosts = nil + volume.Replication = 2 + cluster.Volumes[uuid] = volume + } + + s.stdin.Reset() + err = json.NewEncoder(&s.stdin).Encode(ConfigData{ + Env: nil, + KeepBuffers: 1, + Cluster: cluster, + }) + c.Assert(err, IsNil) + + s.engine = "docker" + s.testRunTrivialContainer(c) + + fs, err := s.logCollection.FileSystem(s.client, s.kc) + c.Assert(err, IsNil) + f, err := fs.Open("keepstore.txt") + c.Assert(err, IsNil) + buf, err := ioutil.ReadAll(f) + c.Assert(err, IsNil) + c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`) + c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) +} + func (s *integrationSuite) testRunTrivialContainer(c *C) { if err := exec.Command("which", s.engine).Run(); err != nil { c.Skip(fmt.Sprintf("%s: %s", s.engine, err)) } s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"} s.setup(c) - code := command{}.RunCommand("crunch-run", []string{ + + args := []string{ "-runtime-engine=" + s.engine, "-enable-memory-limit=false", s.cr.ContainerUUID, - }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr)) + } + if s.stdin.Len() > 0 { + args = append([]string{"-stdin-config=true"}, args...) + } + code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr)) + c.Logf("\n===== stdout =====\n%s", s.stdout.String()) + c.Logf("\n===== stderr =====\n%s", s.stderr.String()) c.Check(code, Equals, 0) err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil) c.Assert(err, IsNil) @@ -185,6 +238,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) { c.Logf("\n===== %s =====\n%s", fi.Name(), buf) } } + s.logCollection = log var output arvados.Collection err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil) @@ -218,4 +272,5 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) { c.Check(fi.Name(), Equals, ".keep") } } + s.outputCollection = output } -- 2.30.2