8363: Report arv-mount error messages as runtime_status warnings.
authorTom Clegg <tom@curii.com>
Tue, 21 Sep 2021 18:16:32 +0000 (14:16 -0400)
committerTom Clegg <tom@curii.com>
Tue, 21 Sep 2021 18:16:32 +0000 (14:16 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/logscanner.go [new file with mode: 0644]
lib/crunchrun/logscanner_test.go [new file with mode: 0644]

index 01141674a6b329fc6ef4f0cb2feb6b60628e96e8..4dd953a3a480a0cde374a3c60cb7a63f03c1ee14 100644 (file)
@@ -66,7 +66,7 @@ type IKeepClient interface {
 // NewLogWriter is a factory function to create a new log writer.
 type NewLogWriter func(name string) (io.WriteCloser, error)
 
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
 
@@ -273,8 +273,8 @@ func (runner *ContainerRunner) LoadImage() (string, error) {
        return imageID, nil
 }
 
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
-       c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+       c = exec.Command(cmdline[0], cmdline[1:]...)
 
        // Copy our environment, but override ARVADOS_API_TOKEN with
        // the container auth token.
@@ -291,8 +291,16 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
                return nil, err
        }
        runner.arvMountLog = NewThrottledLogger(w)
+       scanner := logScanner{
+               Patterns: []string{
+                       "Keep write error",
+                       "Block not found error",
+                       "Unhandled exception during FUSE operation",
+               },
+               ReportFunc: runner.reportArvMountWarning,
+       }
        c.Stdout = runner.arvMountLog
-       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -392,6 +400,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
+               "arv-mount",
                "--foreground",
                "--allow-other",
                "--read-write",
@@ -1100,6 +1109,20 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
+func (runner *ContainerRunner) reportArvMountWarning(message string) {
+       var updated arvados.Container
+       err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "container": arvadosclient.Dict{
+                       "runtime_status": arvadosclient.Dict{
+                               "warning": "arv-mount: " + message,
+                       },
+               },
+       }, &updated)
+       if err != nil {
+               runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+       }
+}
+
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
index bb982cdee76c32cb9321ce88e8fa47fa0588f2f1..8434e53114f1fe4f94f4e2b8e69ff3e3eb9c7cbd 100644 (file)
@@ -1533,6 +1533,36 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
        c.Check(s.api.CalledWith("container.output", arvadostest.DockerImage112PDH), NotNil)
 }
 
+func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
+       s.runner.RunArvMount = func([]string, string) (*exec.Cmd, error) {
+               os.Mkdir(s.runner.ArvMountPoint+"/by_id", 0666)
+               ioutil.WriteFile(s.runner.ArvMountPoint+"/by_id/README", nil, 0666)
+               return s.runner.ArvMountCmd([]string{"bash", "-c", "echo >&2 $(date) Keep write error: I am a teapot; sleep 3"}, "")
+       }
+       s.executor.runFunc = func() {
+               time.Sleep(time.Second)
+               s.executor.exit <- 0
+       }
+       record := `{
+    "command": ["sleep", "1"],
+    "container_image": "` + arvadostest.DockerImage112PDH + `",
+    "cwd": "/bin",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {"API": true},
+    "state": "Locked"
+}`
+       err := json.Unmarshal([]byte(record), &s.api.Container)
+       c.Assert(err, IsNil)
+       err = s.runner.Run()
+       c.Assert(err, IsNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+}
+
 func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
diff --git a/lib/crunchrun/logscanner.go b/lib/crunchrun/logscanner.go
new file mode 100644 (file)
index 0000000..9cf60b6
--- /dev/null
@@ -0,0 +1,47 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bytes"
+       "strings"
+)
+
+// logScanner is an io.Writer that calls ReportFunc(pattern) the first
+// time one of the Patterns appears in the data. Patterns must not
+// contain newlines.
+type logScanner struct {
+       Patterns   []string
+       ReportFunc func(string)
+       reported   bool
+       buf        bytes.Buffer
+}
+
+func (s *logScanner) Write(p []byte) (int, error) {
+       if s.reported {
+               // We only call reportFunc once. Once we've called it
+               // there's no need to buffer/search subsequent writes.
+               return len(p), nil
+       }
+       split := bytes.LastIndexByte(p, '\n')
+       if split < 0 {
+               return s.buf.Write(p)
+       }
+       s.buf.Write(p[:split+1])
+       txt := s.buf.String()
+       for _, pattern := range s.Patterns {
+               if strings.Contains(txt, pattern) {
+                       s.ReportFunc(pattern)
+                       s.reported = true
+                       return len(p), nil
+               }
+       }
+       s.buf.Reset()
+       if split == len(p) {
+               return len(p), nil
+       }
+       n, err := s.buf.Write(p[split+1:])
+       return n + split + 1, err
+}
diff --git a/lib/crunchrun/logscanner_test.go b/lib/crunchrun/logscanner_test.go
new file mode 100644 (file)
index 0000000..26b1281
--- /dev/null
@@ -0,0 +1,28 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&logScannerSuite{})
+
+type logScannerSuite struct {
+}
+
+func (s *logScannerSuite) TestCallReportFuncOnce(c *check.C) {
+       var reported []string
+       ls := logScanner{
+               Patterns: []string{"foobar", "barbaz"},
+               ReportFunc: func(pattern string) {
+                       reported = append(reported, pattern)
+               },
+       }
+       ls.Write([]byte("foo\nbar\nbar"))
+       ls.Write([]byte("baz\nwaz\nqux"))
+       ls.Write([]byte("\nfoobar\n"))
+       c.Check(reported, check.DeepEquals, []string{"barbaz"})
+}