// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
- client *arvados.Client
- ArvClient IArvadosClient
- Kc IKeepClient
- arvados.Container
+ Docker ThinDockerClient
+ client *arvados.Client
+ ArvClient IArvadosClient
+ Kc IKeepClient
+ Container arvados.Container
ContainerConfig dockercontainer.Config
- dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount
- MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
- finalState string
- parentTemp string
+ HostConfig dockercontainer.HostConfig
+ token string
+ ContainerID string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout io.WriteCloser
+ Stderr io.WriteCloser
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, error)
+ finalState string
+ parentTemp string
ListProcesses func() ([]PsProcess, error)
}
}
+func (runner *ContainerRunner) checkpointLogs() {
+ logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
+ defer logCheckpointTicker.Stop()
+
+ logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
+ logCheckpointBytes := crunchLogCheckpointMaxBytes
+ var savedSize int64
+ for range logCheckpointTicker.C {
+ runner.logMtx.Lock()
+ done := runner.LogsPDH != nil
+ runner.logMtx.Unlock()
+ if done {
+ return
+ }
+ size := runner.LogCollection.Size()
+ if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+ continue
+ }
+ logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
+ logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
+ _, err := runner.saveLogCollection()
+ if err != nil {
+ runner.CrunchLog.Printf("error updating log collection: %s", err)
+ continue
+ }
+ savedSize = size
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
// -- it exists only to send logs to other channels.
return nil
}
+ saved, err := runner.saveLogCollection()
+ if err != nil {
+ return err
+ }
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ runner.LogsPDH = &saved.PortableDataHash
+ return nil
+}
+func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) {
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ if runner.LogsPDH != nil {
+ // Already finalized.
+ return
+ }
mt, err := runner.LogCollection.MarshalManifest(".")
if err != nil {
- return fmt.Errorf("While creating log manifest: %v", err)
- }
-
- var response arvados.Collection
- err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{
- "ensure_unique_name": true,
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt}},
- &response)
+ err = fmt.Errorf("error creating log manifest: %v", err)
+ return
+ }
+ reqBody := arvadosclient.Dict{
+ "collection": arvadosclient.Dict{
+ "is_trashed": true,
+ "name": "logs for " + runner.Container.UUID,
+ "manifest_text": mt}}
+ if runner.logUUID == "" {
+ reqBody["ensure_unique_name"] = true
+ err = runner.ArvClient.Create("collections", reqBody, &response)
+ } else {
+ err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ }
if err != nil {
- return fmt.Errorf("While creating log collection: %v", err)
+ err = fmt.Errorf("error saving log collection: %v", err)
+ return
}
- runner.LogsPDH = &response.PortableDataHash
- return nil
+ runner.logUUID = response.UUID
+ return
}
// UpdateContainerRunning updates the container state to "Running"
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
+ go cr.checkpointLogs()
return cr, nil
}
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
outmap := output.(*arvados.Collection)
outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
}
return nil
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
client.WasSetRunning = true
}
+ } else if resourceType == "collections" {
+ mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ output.(*arvados.Collection).UUID = uuid
+ output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
}
return nil
}
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
cr.statInterval = 5 * time.Second
err := cr.SetupMounts()
c.Check(err, IsNil)
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/out"
+ cr.Container.OutputPath = "/out"
err := cr.SetupMounts()
c.Check(err, IsNil)
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
apiflag := true
cr.Container.RuntimeConstraints.API = &apiflag
cr.Container.Mounts = map[string]arvados.Mount{
"/keeptmp": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keeptmp"
+ cr.Container.OutputPath = "/keeptmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "collection"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
Path: "baz",
Writable: true},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "tmp"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, NotNil)
Path: "/",
},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, IsNil)
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogCheckpointMaxDuration = time.Hour / 2
+var crunchLogCheckpointMaxBytes = int64(1 << 25)
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
func (s *LoggingTestSuite) SetUpTest(c *C) {
s.client = arvados.NewClientFromEnv()
+ crunchLogCheckpointMaxDuration = time.Hour * 24 * 365
+ crunchLogCheckpointMaxBytes = 1 << 50
}
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
}
+func (s *LoggingTestSuite) TestLogCheckpoint(c *C) {
+ for _, trial := range []struct {
+ maxBytes int64
+ maxDuration time.Duration
+ }{
+ {1000, 10 * time.Second},
+ {1000000, time.Millisecond},
+ } {
+ c.Logf("max %d bytes, %s", trial.maxBytes, trial.maxDuration)
+ crunchLogCheckpointMaxBytes = trial.maxBytes
+ crunchLogCheckpointMaxDuration = trial.maxDuration
+
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ defer kc.Close()
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
+ ts := &TestTimestamper{}
+ cr.CrunchLog.Timestamper = ts.Timestamp
+ w, err := cr.NewLogWriter("stdout")
+ c.Assert(err, IsNil)
+ stdout := NewThrottledLogger(w)
+ stdout.Timestamper = ts.Timestamp
+
+ c.Check(cr.logUUID, Equals, "")
+ cr.CrunchLog.Printf("Hello %1000s", "space")
+ for i, t := 0, time.NewTicker(time.Millisecond); i < 5000 && cr.logUUID == ""; i++ {
+ <-t.C
+ }
+ c.Check(cr.logUUID, Not(Equals), "")
+ cr.CrunchLog.Print("Goodbye")
+ fmt.Fprint(stdout, "Goodbye\n")
+ cr.CrunchLog.Close()
+ stdout.Close()
+ w.Close()
+
+ mt, err := cr.LogCollection.MarshalManifest(".")
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n")
+ }
+}
+
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
}