X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3594ad790c998c1b1711ea65057fcae9477986d4..15aca78766bda903480e327340d67d3d882ee69f:/sdk/go/logger/logger.go diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go index 6835750812..6dd7fb3723 100644 --- a/sdk/go/logger/logger.go +++ b/sdk/go/logger/logger.go @@ -1,79 +1,204 @@ -// Periodically writes a log to the Arvados SDK. +// Logger periodically writes a log to the Arvados SDK. // -// This package is useful for maintaining a log object that is built -// up over time. Every time the object is modified, it will be written -// to the log. Writes will be throttled to no more than one every -// WriteFrequencySeconds +// This package is useful for maintaining a log object that is updated +// over time. This log object will be periodically written to the log, +// as specified by WriteInterval in the Params. // -// This package is safe for concurrent use. +// This package is safe for concurrent use as long as: +// The maps passed to a LogMutator are not accessed outside of the +// LogMutator // // Usage: // arvLogger := logger.NewLogger(params) -// logData := arvLogger.Acquire() // This will block if others are using the logger -// // Modify the logObject however you want here .. -// logData = arvLogger.Release() // This triggers the actual write, and replaces logObject with a nil pointer so you don't try to modify it when you're no longer holding the lock - +// arvLogger.Update(func(properties map[string]interface{}, +// entry map[string]interface{}) { +// // Modifiy properties and entry however you want +// // properties is a shortcut for entry["properties"].(map[string]interface{}) +// // properties can take any (valid) values you want to give it, +// // entry will only take the fields listed at +// // http://doc.arvados.org/api/schema/Log.html +// // Valid values for properties are anything that can be json +// // encoded (i.e. will not error if you call json.Marshal() on it. +// }) package logger import ( + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "sync" "time" ) const ( - eventTypeLabel string = "event-type" - propertiesLabel string = "properties" + startSuffix = "-start" + partialSuffix = "-partial" + finalSuffix = "-final" + numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work(). ) type LoggerParams struct { - Client arvadosclient.ArvadosClient // The client we use to write log entries - EventType string // The event type to assign to the log entry. - MinimumWriteInterval time.Duration // Wait at least this long between log writes + Client *arvadosclient.ArvadosClient // The client we use to write log entries + EventTypePrefix string // The prefix we use for the event type in the log entry + WriteInterval time.Duration // Wait at least this long between log writes } +// A LogMutator is a function which modifies the log entry. +// It takes two maps as arguments, properties is the first and entry +// is the second +// properties is a shortcut for entry["properties"].(map[string]interface{}) +// properties can take any values you want to give it. +// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html +// properties and entry are only safe to access inside the LogMutator, +// they should not be stored anywhere, otherwise you'll risk +// concurrent access. +type LogMutator func(map[string]interface{}, map[string]interface{}) + +// A Logger is used to build up a log entry over time and write every +// version of it. type Logger struct { - data map[string]interface{} - lock sync.Locker - params LoggerParams + // The data we write + data map[string]interface{} // The entire map that we give to the api + entry map[string]interface{} // Convenience shortcut into data + properties map[string]interface{} // Convenience shortcut into data + + params LoggerParams // Parameters we were given + + // Variables to coordinate updating and writing. + modified bool // Has this data been modified since the last write? + workToDo chan LogMutator // Work to do in the worker thread. + writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified. + hasWritten bool // Whether we've written at all yet. + noMoreWork chan bool // Signals that we're done writing. + + writeHooks []LogMutator // Mutators we call before each write. } -func NewLogger(params LoggerParams) *Logger { - l := &Logger{data: make(map[string]interface{}), - lock: &sync.Mutex{}, - // TODO(misha): Consider copying the params so they're not - // modified after creation. - params: params} - l.data[propertiesLabel] = make(map[string]interface{}) - return l +// Create a new logger based on the specified parameters. +func NewLogger(params LoggerParams) (l *Logger, err error) { + // sanity check parameters + if ¶ms.Client == nil { + err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()") + return + } + if params.EventTypePrefix == "" { + err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()") + return + } + + l = &Logger{ + data: make(map[string]interface{}), + entry: make(map[string]interface{}), + properties: make(map[string]interface{}), + params: params, + workToDo: make(chan LogMutator, 10), + writeTicker: time.NewTicker(params.WriteInterval), + noMoreWork: make(chan bool, numberNoMoreWorkMessages)} + + l.data["log"] = l.entry + l.entry["properties"] = l.properties + + // Start the worker goroutine. + go l.work() + + return l, nil } -func (l *Logger) Acquire() map[string]interface{} { - l.lock.Lock() - return l.data[propertiesLabel].(map[string]interface{}) +// Exported functions will be called from other goroutines, therefore +// all they are allowed to do is enqueue work to be done in the worker +// goroutine. + +// Enqueues an update. This will happen in another goroutine after +// this method returns. +func (l *Logger) Update(mutator LogMutator) { + l.workToDo <- mutator } -func (l *Logger) Release() map[string]interface{} { - // TODO(misha): Add a check (and storage) to make sure we respect MinimumWriteInterval - l.write() - l.lock.Unlock() - return nil +// Similar to Update(), but writes the log entry as soon as possible +// (ignoring MinimumWriteInterval) and blocks until the entry has been +// written. This is useful if you know that you're about to quit +// (e.g. if you discovered a fatal error, or you're finished), since +// go will not wait for timers (including the pending write timer) to +// go off before exiting. +func (l *Logger) FinalUpdate(mutator LogMutator) { + // TODO(misha): Consider not accepting any future updates somehow, + // since they won't get written if they come in after this. + + // Stop the periodic write ticker. We'll perform the final write + // before returning from this function. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeTicker.Stop() + } + + // Apply the final update + l.workToDo <- mutator + + // Perform the final write and signal that we can return. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.write(true) + for i := 0; i < numberNoMoreWorkMessages; { + l.noMoreWork <- true + } + } + + // Wait until we've performed the write. + <-l.noMoreWork +} + +// Adds a hook which will be called every time this logger writes an entry. +func (l *Logger) AddWriteHook(hook LogMutator) { + // We do the work in a LogMutator so that it happens in the worker + // goroutine. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeHooks = append(l.writeHooks, hook) + } +} + +// The worker loop +func (l *Logger) work() { + for { + select { + case <-l.writeTicker.C: + if l.modified { + l.write(false) + l.modified = false + } + case mutator := <-l.workToDo: + mutator(l.properties, l.entry) + l.modified = true + case <-l.noMoreWork: + return + } + } } -func (l *Logger) write() { - // Update the event type in case it was modified or is missing. - // l.data[eventTypeLabel] = l.params.EventType - // m := make(map[string]interface{}) - // m["body"] = l.data - // //err := l.params.Client.Create("logs", l.data, nil) - // //err := l.params.Client.Create("logs", m, nil) - // var results map[string]interface{} - err := l.params.Client.Create("logs", - arvadosclient.Dict{"log": arvadosclient.Dict{ - eventTypeLabel: l.params.EventType, - propertiesLabel: l.data}}, nil) +// Actually writes the log entry. +func (l *Logger) write(isFinal bool) { + + // Run all our hooks + for _, hook := range l.writeHooks { + hook(l.properties, l.entry) + } + + // Update the event type. + if isFinal { + l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix + } else if l.hasWritten { + l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix + } else { + l.entry["event_type"] = l.params.EventTypePrefix + startSuffix + } + l.hasWritten = true + + // Write the log entry. + // This is a network write and will take a while, which is bad + // because we're blocking all the other work on this goroutine. + // + // TODO(misha): Consider rewriting this so that we can encode l.data + // into a string, and then perform the actual write in another + // routine. This will be tricky and will require support in the + // client. + err := l.params.Client.Create("logs", l.data, nil) if err != nil { - log.Fatalf("Received error writing log: %v", err) + log.Printf("Received error writing %v: %v", l.data, err) } }