From 8f592b4a53b368f82ff8be375aa163728699b5a9 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 20 Sep 2018 01:04:52 -0400 Subject: [PATCH] 10181: Save log collection snapshots periodically during run. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/crunch-run/crunchrun.go | 141 ++++++++++++++++++-------- services/crunch-run/crunchrun_test.go | 25 +++-- services/crunch-run/logging.go | 2 + services/crunch-run/logging_test.go | 44 ++++++++ 4 files changed, 157 insertions(+), 55 deletions(-) diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 730194b825..e289c824c9 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -91,37 +91,39 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - Docker ThinDockerClient - client *arvados.Client - ArvClient IArvadosClient - Kc IKeepClient - arvados.Container + Docker ThinDockerClient + client *arvados.Client + ArvClient IArvadosClient + Kc IKeepClient + Container arvados.Container ContainerConfig dockercontainer.Config - dockercontainer.HostConfig - token string - ContainerID string - ExitCode *int - NewLogWriter - loggingDone chan bool - CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser - LogCollection arvados.CollectionFileSystem - LogsPDH *string - RunArvMount - MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, error) - finalState string - parentTemp string + HostConfig dockercontainer.HostConfig + token string + ContainerID string + ExitCode *int + NewLogWriter NewLogWriter + loggingDone chan bool + CrunchLog *ThrottledLogger + Stdout io.WriteCloser + Stderr io.WriteCloser + logUUID string + logMtx sync.Mutex + LogCollection arvados.CollectionFileSystem + LogsPDH *string + RunArvMount RunArvMount + MkTempDir MkTempDir + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Binds []string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + SecretMounts map[string]arvados.Mount + MkArvClient func(token string) (IArvadosClient, error) + finalState string + parentTemp string ListProcesses func() ([]PsProcess, error) @@ -1175,6 +1177,35 @@ func (runner *ContainerRunner) WaitFinish() error { } } +func (runner *ContainerRunner) checkpointLogs() { + logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360) + defer logCheckpointTicker.Stop() + + logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration) + logCheckpointBytes := crunchLogCheckpointMaxBytes + var savedSize int64 + for range logCheckpointTicker.C { + runner.logMtx.Lock() + done := runner.LogsPDH != nil + runner.logMtx.Unlock() + if done { + return + } + size := runner.LogCollection.Size() + if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) { + continue + } + logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration) + logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes + _, err := runner.saveLogCollection() + if err != nil { + runner.CrunchLog.Printf("error updating log collection: %s", err) + continue + } + savedSize = size + } +} + // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput() error { @@ -1312,26 +1343,45 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } + saved, err := runner.saveLogCollection() + if err != nil { + return err + } + runner.logMtx.Lock() + defer runner.logMtx.Unlock() + runner.LogsPDH = &saved.PortableDataHash + return nil +} +func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) { + runner.logMtx.Lock() + defer runner.logMtx.Unlock() + if runner.LogsPDH != nil { + // Already finalized. + return + } mt, err := runner.LogCollection.MarshalManifest(".") if err != nil { - return fmt.Errorf("While creating log manifest: %v", err) - } - - var response arvados.Collection - err = runner.ArvClient.Create("collections", - arvadosclient.Dict{ - "ensure_unique_name": true, - "collection": arvadosclient.Dict{ - "is_trashed": true, - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt}}, - &response) + err = fmt.Errorf("error creating log manifest: %v", err) + return + } + reqBody := arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + "is_trashed": true, + "name": "logs for " + runner.Container.UUID, + "manifest_text": mt}} + if runner.logUUID == "" { + reqBody["ensure_unique_name"] = true + err = runner.ArvClient.Create("collections", reqBody, &response) + } else { + err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response) + } if err != nil { - return fmt.Errorf("While creating log collection: %v", err) + err = fmt.Errorf("error saving log collection: %v", err) + return } - runner.LogsPDH = &response.PortableDataHash - return nil + runner.logUUID = response.UUID + return } // UpdateContainerRunning updates the container state to "Running" @@ -1630,6 +1680,7 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) loadLogThrottleParams(api) + go cr.checkpointLogs() return cr, nil } diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index 8d8e040000..217d4236ba 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -230,6 +230,7 @@ func (client *ArvTestClient) Create(resourceType string, mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string) outmap := output.(*arvados.Collection) outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt)) + outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt))) } return nil @@ -316,6 +317,10 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters if parameters["container"].(arvadosclient.Dict)["state"] == "Running" { client.WasSetRunning = true } + } else if resourceType == "collections" { + mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string) + output.(*arvados.Collection).UUID = uuid + output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt))) } return nil } @@ -1142,7 +1147,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.ArvMountPoint = "" cr.Container.Mounts = make(map[string]arvados.Mount) cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" cr.statInterval = 5 * time.Second err := cr.SetupMounts() c.Check(err, IsNil) @@ -1161,7 +1166,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts = make(map[string]arvados.Mount) cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"} cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} - cr.OutputPath = "/out" + cr.Container.OutputPath = "/out" err := cr.SetupMounts() c.Check(err, IsNil) @@ -1179,7 +1184,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.ArvMountPoint = "" cr.Container.Mounts = make(map[string]arvados.Mount) cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" apiflag := true cr.Container.RuntimeConstraints.API = &apiflag @@ -1203,7 +1208,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts = map[string]arvados.Mount{ "/keeptmp": {Kind: "collection", Writable: true}, } - cr.OutputPath = "/keeptmp" + cr.Container.OutputPath = "/keeptmp" os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) @@ -1225,7 +1230,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}, "/keepout": {Kind: "collection", Writable: true}, } - cr.OutputPath = "/keepout" + cr.Container.OutputPath = "/keepout" os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) @@ -1251,7 +1256,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}, "/keepout": {Kind: "collection", Writable: true}, } - cr.OutputPath = "/keepout" + cr.Container.OutputPath = "/keepout" os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) @@ -1332,7 +1337,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { "/tmp": {Kind: "tmp"}, "/tmp/foo": {Kind: "collection"}, } - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) @@ -1362,7 +1367,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { Path: "baz", Writable: true}, } - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm) @@ -1391,7 +1396,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { "/tmp": {Kind: "tmp"}, "/tmp/foo": {Kind: "tmp"}, } - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" err := cr.SetupMounts() c.Check(err, NotNil) @@ -1439,7 +1444,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { Path: "/", }, } - cr.OutputPath = "/tmp" + cr.Container.OutputPath = "/tmp" err := cr.SetupMounts() c.Check(err, IsNil) diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index ce0a661263..5d85bcf6c2 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -197,6 +197,8 @@ var crunchLogThrottleLines int64 = 1024 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 var crunchLogBytesPerEvent int64 = 4096 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1 +var crunchLogCheckpointMaxDuration = time.Hour / 2 +var crunchLogCheckpointMaxBytes = int64(1 << 25) // ArvLogWriter is an io.WriteCloser that processes each write by // writing it through to another io.WriteCloser (typically a diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go index 13a171ae84..f118781c67 100644 --- a/services/crunch-run/logging_test.go +++ b/services/crunch-run/logging_test.go @@ -37,6 +37,8 @@ var _ = Suite(&LoggingTestSuite{}) func (s *LoggingTestSuite) SetUpTest(c *C) { s.client = arvados.NewClientFromEnv() + crunchLogCheckpointMaxDuration = time.Hour * 24 * 365 + crunchLogCheckpointMaxBytes = 1 << 50 } func (s *LoggingTestSuite) TestWriteLogs(c *C) { @@ -129,6 +131,48 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) { c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n") } +func (s *LoggingTestSuite) TestLogCheckpoint(c *C) { + for _, trial := range []struct { + maxBytes int64 + maxDuration time.Duration + }{ + {1000, 10 * time.Second}, + {1000000, time.Millisecond}, + } { + c.Logf("max %d bytes, %s", trial.maxBytes, trial.maxDuration) + crunchLogCheckpointMaxBytes = trial.maxBytes + crunchLogCheckpointMaxDuration = trial.maxDuration + + api := &ArvTestClient{} + kc := &KeepTestClient{} + defer kc.Close() + cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + c.Assert(err, IsNil) + ts := &TestTimestamper{} + cr.CrunchLog.Timestamper = ts.Timestamp + w, err := cr.NewLogWriter("stdout") + c.Assert(err, IsNil) + stdout := NewThrottledLogger(w) + stdout.Timestamper = ts.Timestamp + + c.Check(cr.logUUID, Equals, "") + cr.CrunchLog.Printf("Hello %1000s", "space") + for i, t := 0, time.NewTicker(time.Millisecond); i < 5000 && cr.logUUID == ""; i++ { + <-t.C + } + c.Check(cr.logUUID, Not(Equals), "") + cr.CrunchLog.Print("Goodbye") + fmt.Fprint(stdout, "Goodbye\n") + cr.CrunchLog.Close() + stdout.Close() + w.Close() + + mt, err := cr.LogCollection.MarshalManifest(".") + c.Check(err, IsNil) + c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n") + } +} + func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) { s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds") } -- 2.30.2