X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/17800e7d4a9574035dd48b71ec4247f70525d45e..7413300e85b60d0533ec117b2175cb67fd9f10e3:/sdk/go/logger/logger.go diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go index 2a4a962a0b..3b2db3a321 100644 --- a/sdk/go/logger/logger.go +++ b/sdk/go/logger/logger.go @@ -1,40 +1,45 @@ // Logger periodically writes a log to the Arvados SDK. // -// This package is useful for maintaining a log object that is built -// up over time. Every time the object is modified, it will be written -// to the log. Writes will be throttled to no more than one every -// WriteFrequencySeconds +// This package is useful for maintaining a log object that is updated +// over time. This log object will be periodically written to the log, +// as specified by WriteInterval in the Params. // // This package is safe for concurrent use as long as: -// 1. The maps returned by Edit() are only edited in the same routine -// that called Edit() -// 2. Those maps not edited after calling Record() -// An easy way to assure this is true is to place the call to Edit() -// within a short block as shown below in the Usage Example: +// The maps passed to a LogMutator are not accessed outside of the +// LogMutator // // Usage: // arvLogger := logger.NewLogger(params) -// { -// properties, entry := arvLogger.Edit() // This will block if others are using the logger +// arvLogger.Update(func(properties map[string]interface{}, +// entry map[string]interface{}) { // // Modifiy properties and entry however you want // // properties is a shortcut for entry["properties"].(map[string]interface{}) -// // properties can take any values you want to give it, -// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html -// } -// arvLogger.Record() // This triggers the actual log write +// // properties can take any (valid) values you want to give it, +// // entry will only take the fields listed at +// // http://doc.arvados.org/api/schema/Log.html +// // Valid values for properties are anything that can be json +// // encoded (i.e. will not error if you call json.Marshal() on it. +// }) package logger import ( + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "sync" "time" ) +const ( + startSuffix = "-start" + partialSuffix = "-partial" + finalSuffix = "-final" + numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work(). +) + type LoggerParams struct { - Client arvadosclient.ArvadosClient // The client we use to write log entries - EventType string // The event type to assign to the log entry. - MinimumWriteInterval time.Duration // Wait at least this long between log writes + Client arvadosclient.ArvadosClient // The client we use to write log entries + EventTypePrefix string // The prefix we use for the event type in the log entry + WriteInterval time.Duration // Wait at least this long between log writes } // A LogMutator is a function which modifies the log entry. @@ -51,122 +56,149 @@ type LogMutator func(map[string]interface{}, map[string]interface{}) // A Logger is used to build up a log entry over time and write every // version of it. type Logger struct { - // The Data we write + // The data we write data map[string]interface{} // The entire map that we give to the api entry map[string]interface{} // Convenience shortcut into data properties map[string]interface{} // Convenience shortcut into data - lock sync.Locker // Synchronizes editing and writing params LoggerParams // Parameters we were given - lastWrite time.Time // The last time we wrote a log entry - modified bool // Has this data been modified since the last write + // Variables to coordinate updating and writing. + modified bool // Has this data been modified since the last write? + workToDo chan LogMutator // Work to do in the worker thread. + writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified. + hasWritten bool // Whether we've written at all yet. + noMoreWork chan bool // Signals that we're done writing. - writeHooks []LogMutator + writeHooks []LogMutator // Mutators we call before each write. } // Create a new logger based on the specified parameters. -func NewLogger(params LoggerParams) *Logger { - // TODO(misha): Add some params checking here. - l := &Logger{data: make(map[string]interface{}), - lock: &sync.Mutex{}, - params: params} - l.entry = make(map[string]interface{}) +func NewLogger(params LoggerParams) (l *Logger, err error) { + // sanity check parameters + if ¶ms.Client == nil { + err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()") + return + } + if params.EventTypePrefix == "" { + err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()") + return + } + + l = &Logger{ + data: make(map[string]interface{}), + entry: make(map[string]interface{}), + properties: make(map[string]interface{}), + params: params, + workToDo: make(chan LogMutator, 10), + writeTicker: time.NewTicker(params.WriteInterval), + noMoreWork: make(chan bool, numberNoMoreWorkMessages)} + l.data["log"] = l.entry - l.properties = make(map[string]interface{}) l.entry["properties"] = l.properties - return l -} -// Get access to the maps you can edit. This will hold a lock until -// you call Record. Do not edit the maps in any other goroutines or -// after calling Record. -// You don't need to edit both maps, -// properties can take any values you want to give it, -// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html -// properties is a shortcut for entry["properties"].(map[string]interface{}) -func (l *Logger) Edit() (properties map[string]interface{}, entry map[string]interface{}) { - l.lock.Lock() - l.modified = true // We don't actually know the caller will modifiy the data, but we assume they will. + // Start the worker goroutine. + go l.work() - return l.properties, l.entry + return l, nil } -// function to test new api, replacing Edit() and Record() -func (l *Logger) MutateLog(mutator LogMutator) { - mutator(l.Edit()) - l.Record() -} +// Exported functions will be called from other goroutines, therefore +// all they are allowed to do is enqueue work to be done in the worker +// goroutine. -// Adds a hook which will be called every time this logger writes an entry. -// The hook takes properties and entry as arguments, in that order. -// This is useful for stuff like memory profiling. -// This must be called between Edit() and Record() (e.g. while holding the lock) -func (l *Logger) AddWriteHook(hook LogMutator) { - // TODO(misha): Acquire lock here! and erase comment about edit. - l.writeHooks = append(l.writeHooks, hook) - // TODO(misha): consider flipping the dirty bit here. +// Enqueues an update. This will happen in another goroutine after +// this method returns. +func (l *Logger) Update(mutator LogMutator) { + l.workToDo <- mutator } -// Write the log entry you've built up so far. Do not edit the maps -// returned by Edit() after calling this method. -// If you have already written within MinimumWriteInterval, then this -// will schedule a future write instead. -// In either case, the lock will be released before Record() returns. -func (l *Logger) Record() { - if l.writeAllowedNow() { - // We haven't written in the allowed interval yet, try to write. - l.write() - } else { - // TODO(misha): Only allow one outstanding write to be scheduled. - nextTimeToWrite := l.lastWrite.Add(l.params.MinimumWriteInterval) - writeAfter := nextTimeToWrite.Sub(time.Now()) - time.AfterFunc(writeAfter, l.acquireLockConsiderWriting) +// Similar to Update(), but writes the log entry as soon as possible +// (ignoring MinimumWriteInterval) and blocks until the entry has been +// written. This is useful if you know that you're about to quit +// (e.g. if you discovered a fatal error, or you're finished), since +// go will not wait for timers (including the pending write timer) to +// go off before exiting. +func (l *Logger) FinalUpdate(mutator LogMutator) { + // TODO(misha): Consider not accepting any future updates somehow, + // since they won't get written if they come in after this. + + // Stop the periodic write ticker. We'll perform the final write + // before returning from this function. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeTicker.Stop() } - l.lock.Unlock() + + // Apply the final update + l.workToDo <- mutator + + // Perform the final write and signal that we can return. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.write(true) + for i := 0; i < numberNoMoreWorkMessages; { + l.noMoreWork <- true + } + } + + // Wait until we've performed the write. + <-l.noMoreWork } -// Similar to Record, but forces a write without respecting the -// MinimumWriteInterval. This is useful if you know that you're about -// to quit (e.g. if you discovered a fatal error). -func (l *Logger) ForceRecord() { - l.write() - l.lock.Unlock() +// Adds a hook which will be called every time this logger writes an entry. +func (l *Logger) AddWriteHook(hook LogMutator) { + // We do the work in a LogMutator so that it happens in the worker + // goroutine. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeHooks = append(l.writeHooks, hook) + } } -// Whether enough time has elapsed since the last write. -func (l *Logger) writeAllowedNow() bool { - return l.lastWrite.Add(l.params.MinimumWriteInterval).Before(time.Now()) +// The worker loop +func (l *Logger) work() { + for { + select { + case <-l.writeTicker.C: + if l.modified { + l.write(false) + l.modified = false + } + case mutator := <-l.workToDo: + mutator(l.properties, l.entry) + l.modified = true + case <-l.noMoreWork: + return + } + } } -// Actually writes the log entry. This method assumes we're holding the lock. -func (l *Logger) write() { +// Actually writes the log entry. +func (l *Logger) write(isFinal bool) { // Run all our hooks for _, hook := range l.writeHooks { hook(l.properties, l.entry) } - // Update the event type in case it was modified or is missing. - l.entry["event_type"] = l.params.EventType + // Update the event type. + if isFinal { + l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix + } else if l.hasWritten { + l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix + } else { + l.entry["event_type"] = l.params.EventTypePrefix + startSuffix + } + l.hasWritten = true // Write the log entry. + // This is a network write and will take a while, which is bad + // because we're blocking all the other work on this goroutine. + // + // TODO(misha): Consider rewriting this so that we can encode l.data + // into a string, and then perform the actual write in another + // routine. This will be tricky and will require support in the + // client. err := l.params.Client.Create("logs", l.data, nil) if err != nil { - log.Printf("Attempted to log: %v", l.data) - log.Fatalf("Received error writing log: %v", err) - } - - // Update stats. - l.lastWrite = time.Now() - l.modified = false -} - -func (l *Logger) acquireLockConsiderWriting() { - l.lock.Lock() - if l.modified && l.writeAllowedNow() { - // We have something new to write and we're allowed to write. - l.write() + log.Printf("Received error writing %v: %v", l.data, err) } - l.lock.Unlock() }