# in memory for the duration of the container.
LocalKeepBlobBuffersPerVCPU: 0
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
"Containers.JobsAPI.Enable": true,
"Containers.JobsAPI.GitInternalDir": false,
"Containers.LocalKeepBlobBuffersPerVCPU": false,
+ "Containers.LocalKeepLogsToContainerLog": false,
"Containers.Logging": false,
"Containers.LogReuseDecisions": false,
"Containers.LSF": false,
# in memory for the duration of the container.
LocalKeepBlobBuffersPerVCPU: 0
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+ ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
ldr.checkEmptyKeepstores(cc),
ldr.checkUnlistedKeepstores(cc),
ldr.checkLocalKeepstoreVolumes(cc),
return nil
}
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+ for _, s := range accepted {
+ if s == value {
+ return nil
+ }
+ }
+ return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
cluster:
for id, cc := range cfg.Clusters {
return 1
}
- if keepstore != nil {
- w, err := cr.NewLogWriter("keepstore")
+ if keepstore == nil {
+ // Nothing is written to keepstoreLogbuf, no need to
+ // call SetWriter.
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ logwriter, err := cr.NewLogWriter("keepstore")
if err != nil {
log.Print(err)
return 1
}
- cr.keepstoreLogger = NewThrottledLogger(w)
- err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
if err != nil {
log.Print(err)
return 1
})
c.Assert(err, IsNil)
c.Logf("input pdh %s", s.input.PortableDataHash)
-
- s.logCollection = arvados.Collection{}
- s.outputCollection = arvados.Collection{}
}
func (s *integrationSuite) TearDownSuite(c *C) {
s.stdin = bytes.Buffer{}
s.stdout = bytes.Buffer{}
s.stderr = bytes.Buffer{}
+ s.logCollection = arvados.Collection{}
+ s.outputCollection = arvados.Collection{}
s.cr = arvados.ContainerRequest{
Priority: 1,
State: "Committed",
}
func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
- cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
- c.Assert(err, IsNil)
- cluster, err := cfg.GetCluster("")
- c.Assert(err, IsNil)
- for uuid, volume := range cluster.Volumes {
- volume.AccessViaHosts = nil
- volume.Replication = 2
- cluster.Volumes[uuid] = volume
- }
+ for _, trial := range []struct {
+ logConfig string
+ matchGetReq Checker
+ matchPutReq Checker
+ matchStartupMessage Checker
+ }{
+ {"none", Not(Matches), Not(Matches), Not(Matches)},
+ {"all", Matches, Matches, Matches},
+ {"errors", Not(Matches), Not(Matches), Matches},
+ } {
+ c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+ s.SetUpTest(c)
- s.stdin.Reset()
- err = json.NewEncoder(&s.stdin).Encode(ConfigData{
- Env: nil,
- KeepBuffers: 1,
- Cluster: cluster,
- })
- c.Assert(err, IsNil)
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, IsNil)
+ for uuid, volume := range cluster.Volumes {
+ volume.AccessViaHosts = nil
+ volume.Replication = 2
+ cluster.Volumes[uuid] = volume
+ }
+ cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
- s.engine = "docker"
- s.testRunTrivialContainer(c)
+ s.stdin.Reset()
+ err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+ Env: nil,
+ KeepBuffers: 1,
+ Cluster: cluster,
+ })
+ c.Assert(err, IsNil)
- fs, err := s.logCollection.FileSystem(s.client, s.kc)
- c.Assert(err, IsNil)
- f, err := fs.Open("keepstore.txt")
- c.Assert(err, IsNil)
- buf, err := ioutil.ReadAll(f)
- c.Assert(err, IsNil)
- c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
- c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ s.engine = "docker"
+ s.testRunTrivialContainer(c)
+
+ fs, err := s.logCollection.FileSystem(s.client, s.kc)
+ c.Assert(err, IsNil)
+ f, err := fs.Open("keepstore.txt")
+ if trial.logConfig == "none" {
+ c.Check(err, NotNil)
+ c.Check(os.IsNotExist(err), Equals, true)
+ } else {
+ c.Assert(err, IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Assert(err, IsNil)
+ c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+ c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ }
+ }
}
func (s *integrationSuite) testRunTrivialContainer(c *C) {
import (
"bufio"
"bytes"
+ "encoding/json"
"fmt"
"io"
"log"
loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
}
+
+type filterKeepstoreErrorsOnly struct {
+ io.WriteCloser
+ buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+ log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+ f.buf = append(f.buf, p...)
+ start := 0
+ for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+ if f.buf[i] == '\n' {
+ if f.check(f.buf[start:i]) {
+ _, err := f.WriteCloser.Write(f.buf[start : i+1])
+ if err != nil {
+ return 0, err
+ }
+ }
+ start = i + 1
+ }
+ }
+ if start > 0 {
+ copy(f.buf, f.buf[start:])
+ f.buf = f.buf[:len(f.buf)-start]
+ }
+ return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+ if len(line) == 0 {
+ return false
+ }
+ if line[0] != '{' {
+ return true
+ }
+ var m map[string]interface{}
+ err := json.Unmarshal(line, &m)
+ if err != nil {
+ return true
+ }
+ if m["msg"] == "request" {
+ return false
+ }
+ if m["msg"] == "response" {
+ if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+ return false
+ }
+ }
+ return true
+}
package crunchrun
import (
+ "bytes"
"fmt"
+ "io"
"strings"
"testing"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
type LoggingTestSuite struct {
c.Check(true, Equals, strings.Contains(stderrLog, expected))
c.Check(string(kc.Content), Equals, logtext)
}
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+ var buf bytes.Buffer
+ f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+ for _, s := range []string{
+ "not j",
+ "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+ "\n[\n",
+ `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+ `{"msg":"response","respStatusCode":206}` + "\n",
+ } {
+ f.Write([]byte(s))
+ }
+ c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+ io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
UsePreemptibleInstances bool
RuntimeEngine string
LocalKeepBlobBuffersPerVCPU int
+ LocalKeepLogsToContainerLog string
JobsAPI struct {
Enable string