X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/970095751e2e836ed296152ae3e9ccb6caa62f62..refs/heads/9374-go-sdk:/sdk/go/logger/logger.go diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go index fd40cce29d..3b2db3a321 100644 --- a/sdk/go/logger/logger.go +++ b/sdk/go/logger/logger.go @@ -1,145 +1,204 @@ // Logger periodically writes a log to the Arvados SDK. // -// This package is useful for maintaining a log object that is built -// up over time. Every time the object is modified, it will be written -// to the log. Writes will be throttled to no more than one every -// WriteFrequencySeconds +// This package is useful for maintaining a log object that is updated +// over time. This log object will be periodically written to the log, +// as specified by WriteInterval in the Params. // // This package is safe for concurrent use as long as: -// 1. The maps returned by Edit() are only edited in the same routine -// that called Edit() -// 2. Those maps not edited after calling Record() -// An easy way to assure this is true is to place the call to Edit() -// within a short block as shown below in the Usage Example: +// The maps passed to a LogMutator are not accessed outside of the +// LogMutator // // Usage: // arvLogger := logger.NewLogger(params) -// { -// properties, entry := arvLogger.Edit() // This will block if others are using the logger +// arvLogger.Update(func(properties map[string]interface{}, +// entry map[string]interface{}) { // // Modifiy properties and entry however you want // // properties is a shortcut for entry["properties"].(map[string]interface{}) -// // properties can take any values you want to give it, -// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html -// } -// arvLogger.Record() // This triggers the actual log write +// // properties can take any (valid) values you want to give it, +// // entry will only take the fields listed at +// // http://doc.arvados.org/api/schema/Log.html +// // Valid values for properties are anything that can be json +// // encoded (i.e. will not error if you call json.Marshal() on it. +// }) package logger import ( + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "sync" "time" ) +const ( + startSuffix = "-start" + partialSuffix = "-partial" + finalSuffix = "-final" + numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work(). +) + type LoggerParams struct { - Client arvadosclient.ArvadosClient // The client we use to write log entries - EventType string // The event type to assign to the log entry. - MinimumWriteInterval time.Duration // Wait at least this long between log writes + Client arvadosclient.ArvadosClient // The client we use to write log entries + EventTypePrefix string // The prefix we use for the event type in the log entry + WriteInterval time.Duration // Wait at least this long between log writes } +// A LogMutator is a function which modifies the log entry. +// It takes two maps as arguments, properties is the first and entry +// is the second +// properties is a shortcut for entry["properties"].(map[string]interface{}) +// properties can take any values you want to give it. +// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html +// properties and entry are only safe to access inside the LogMutator, +// they should not be stored anywhere, otherwise you'll risk +// concurrent access. +type LogMutator func(map[string]interface{}, map[string]interface{}) + // A Logger is used to build up a log entry over time and write every // version of it. type Logger struct { - // The Data we write - data map[string]interface{} // The entire map that we give to the api - entry map[string]interface{} // Convenience shortcut into data - properties map[string]interface{} // Convenience shortcut into data + // The data we write + data map[string]interface{} // The entire map that we give to the api + entry map[string]interface{} // Convenience shortcut into data + properties map[string]interface{} // Convenience shortcut into data - lock sync.Locker // Synchronizes editing and writing - params LoggerParams // Parameters we were given + params LoggerParams // Parameters we were given - lastWrite time.Time // The last time we wrote a log entry - modified bool // Has this data been modified since the last write + // Variables to coordinate updating and writing. + modified bool // Has this data been modified since the last write? + workToDo chan LogMutator // Work to do in the worker thread. + writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified. + hasWritten bool // Whether we've written at all yet. + noMoreWork chan bool // Signals that we're done writing. - editHooks []func(map[string]interface{},map[string]interface{}) + writeHooks []LogMutator // Mutators we call before each write. } // Create a new logger based on the specified parameters. -func NewLogger(params LoggerParams) *Logger { - // TODO(misha): Add some params checking here. - l := &Logger{data: make(map[string]interface{}), - lock: &sync.Mutex{}, - params: params} - l.entry = make(map[string]interface{}) +func NewLogger(params LoggerParams) (l *Logger, err error) { + // sanity check parameters + if ¶ms.Client == nil { + err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()") + return + } + if params.EventTypePrefix == "" { + err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()") + return + } + + l = &Logger{ + data: make(map[string]interface{}), + entry: make(map[string]interface{}), + properties: make(map[string]interface{}), + params: params, + workToDo: make(chan LogMutator, 10), + writeTicker: time.NewTicker(params.WriteInterval), + noMoreWork: make(chan bool, numberNoMoreWorkMessages)} + l.data["log"] = l.entry - l.properties = make(map[string]interface{}) l.entry["properties"] = l.properties - return l -} - -// Get access to the maps you can edit. This will hold a lock until -// you call Record. Do not edit the maps in any other goroutines or -// after calling Record. -// You don't need to edit both maps, -// properties can take any values you want to give it, -// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html -// properties is a shortcut for entry["properties"].(map[string]interface{}) -func (l *Logger) Edit() (properties map[string]interface{}, entry map[string]interface{}) { - l.lock.Lock() - l.modified = true // We don't actually know the caller will modifiy the data, but we assume they will. - // Run all our hooks - for _, hook := range l.editHooks { - hook(l.properties, l.entry) - } + // Start the worker goroutine. + go l.work() - return l.properties, l.entry + return l, nil } -// Adds a hook which will be called everytime Edit() is called. -// The hook takes properties and entry as arguments, in that order. -// This is useful for stuff like memory profiling. -// This must be called between Edit() and Record(). -// For convenience AddEditHook will call hook when it is added as well. -func (l *Logger) AddEditHook(hook func(map[string]interface{}, - map[string]interface{})) { - l.editHooks = append(l.editHooks, hook) - hook(l.properties, l.entry) +// Exported functions will be called from other goroutines, therefore +// all they are allowed to do is enqueue work to be done in the worker +// goroutine. + +// Enqueues an update. This will happen in another goroutine after +// this method returns. +func (l *Logger) Update(mutator LogMutator) { + l.workToDo <- mutator } -// Write the log entry you've built up so far. Do not edit the maps -// returned by Edit() after calling this method. -// If you have already written within MinimumWriteInterval, then this -// will schedule a future write instead. -// In either case, the lock will be released before Record() returns. -func (l *Logger) Record() { - if l.writeAllowedNow() { - // We haven't written in the allowed interval yet, try to write. - l.write() - } else { - nextTimeToWrite := l.lastWrite.Add(l.params.MinimumWriteInterval) - writeAfter := nextTimeToWrite.Sub(time.Now()) - time.AfterFunc(writeAfter, l.acquireLockConsiderWriting) +// Similar to Update(), but writes the log entry as soon as possible +// (ignoring MinimumWriteInterval) and blocks until the entry has been +// written. This is useful if you know that you're about to quit +// (e.g. if you discovered a fatal error, or you're finished), since +// go will not wait for timers (including the pending write timer) to +// go off before exiting. +func (l *Logger) FinalUpdate(mutator LogMutator) { + // TODO(misha): Consider not accepting any future updates somehow, + // since they won't get written if they come in after this. + + // Stop the periodic write ticker. We'll perform the final write + // before returning from this function. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeTicker.Stop() } - l.lock.Unlock() -} + // Apply the final update + l.workToDo <- mutator + + // Perform the final write and signal that we can return. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.write(true) + for i := 0; i < numberNoMoreWorkMessages; { + l.noMoreWork <- true + } + } -// Whether enough time has elapsed since the last write. -func (l *Logger) writeAllowedNow() bool { - return l.lastWrite.Add(l.params.MinimumWriteInterval).Before(time.Now()) + // Wait until we've performed the write. + <-l.noMoreWork } +// Adds a hook which will be called every time this logger writes an entry. +func (l *Logger) AddWriteHook(hook LogMutator) { + // We do the work in a LogMutator so that it happens in the worker + // goroutine. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeHooks = append(l.writeHooks, hook) + } +} -// Actually writes the log entry. This method assumes we're holding the lock. -func (l *Logger) write() { - // Update the event type in case it was modified or is missing. - l.entry["event_type"] = l.params.EventType - err := l.params.Client.Create("logs", l.data, nil) - if err != nil { - log.Printf("Attempted to log: %v", l.data) - log.Fatalf("Received error writing log: %v", err) +// The worker loop +func (l *Logger) work() { + for { + select { + case <-l.writeTicker.C: + if l.modified { + l.write(false) + l.modified = false + } + case mutator := <-l.workToDo: + mutator(l.properties, l.entry) + l.modified = true + case <-l.noMoreWork: + return + } } - l.lastWrite = time.Now() - l.modified = false } +// Actually writes the log entry. +func (l *Logger) write(isFinal bool) { -func (l *Logger) acquireLockConsiderWriting() { - l.lock.Lock() - if l.modified && l.writeAllowedNow() { - // We have something new to write and we're allowed to write. - l.write() + // Run all our hooks + for _, hook := range l.writeHooks { + hook(l.properties, l.entry) + } + + // Update the event type. + if isFinal { + l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix + } else if l.hasWritten { + l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix + } else { + l.entry["event_type"] = l.params.EventTypePrefix + startSuffix + } + l.hasWritten = true + + // Write the log entry. + // This is a network write and will take a while, which is bad + // because we're blocking all the other work on this goroutine. + // + // TODO(misha): Consider rewriting this so that we can encode l.data + // into a string, and then perform the actual write in another + // routine. This will be tricky and will require support in the + // client. + err := l.params.Client.Create("logs", l.data, nil) + if err != nil { + log.Printf("Received error writing %v: %v", l.data, err) } - l.lock.Unlock() }