X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4df1175e30c21850af394fcd60c9bb7ca3d981a5..dffb8b5eccb3af901512effe562a8b8944b14b89:/sdk/go/logger/logger.go diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go index ca344be045..ce18e90ecf 100644 --- a/sdk/go/logger/logger.go +++ b/sdk/go/logger/logger.go @@ -1,9 +1,8 @@ // Logger periodically writes a log to the Arvados SDK. // // This package is useful for maintaining a log object that is updated -// over time. Every time the object is updated, it will be written to -// the log. Writes will be throttled to no more than one every -// WriteFrequencySeconds +// over time. This log object will be periodically written to the log, +// as specified by WriteInterval in the Params. // // This package is safe for concurrent use as long as: // The maps passed to a LogMutator are not accessed outside of the @@ -23,14 +22,20 @@ package logger import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "sync" "time" ) +const ( + startSuffix = "-start" + partialSuffix = "-partial" + finalSuffix = "-final" + numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work(). +) + type LoggerParams struct { - Client arvadosclient.ArvadosClient // The client we use to write log entries - EventType string // The event type to assign to the log entry. - MinimumWriteInterval time.Duration // Wait at least this long between log writes + Client arvadosclient.ArvadosClient // The client we use to write log entries + EventTypePrefix string // The prefix we use for the event type in the log entry + WriteInterval time.Duration // Wait at least this long between log writes } // A LogMutator is a function which modifies the log entry. @@ -52,13 +57,14 @@ type Logger struct { entry map[string]interface{} // Convenience shortcut into data properties map[string]interface{} // Convenience shortcut into data - lock sync.Locker // Synchronizes access to this struct params LoggerParams // Parameters we were given - // Variables used to determine when and if we write to the log. - nextWriteAllowed time.Time // The next time we can write, respecting MinimumWriteInterval - modified bool // Has this data been modified since the last write? - writeScheduled bool // Is a write been scheduled for the future? + // Variables to coordinate updating and writing. + modified bool // Has this data been modified since the last write? + workToDo chan LogMutator // Work to do in the worker thread. + writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified. + hasWritten bool // Whether we've written at all yet. + noMoreWork chan bool // Signals that we're done writing. writeHooks []LogMutator // Mutators we call before each write. } @@ -69,117 +75,125 @@ func NewLogger(params LoggerParams) *Logger { if ¶ms.Client == nil { log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()") } - if params.EventType == "" { - log.Fatal("Empty event type in LoggerParams passed in to NewLogger()") + if params.EventTypePrefix == "" { + log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()") } - l := &Logger{data: make(map[string]interface{}), - lock: &sync.Mutex{}, - params: params} - l.entry = make(map[string]interface{}) + l := &Logger{ + data: make(map[string]interface{}), + entry: make(map[string]interface{}), + properties: make(map[string]interface{}), + params: params, + workToDo: make(chan LogMutator, 10), + writeTicker: time.NewTicker(params.WriteInterval), + noMoreWork: make(chan bool, numberNoMoreWorkMessages)} + l.data["log"] = l.entry - l.properties = make(map[string]interface{}) l.entry["properties"] = l.properties - return l -} - -// Updates the log data and then writes it to the api server. If the -// log has been recently written then the write will be postponed to -// respect MinimumWriteInterval and this function will return before -// the write occurs. -func (l *Logger) Update(mutator LogMutator) { - l.lock.Lock() - - mutator(l.properties, l.entry) - l.modified = true // We assume the mutator modified the log, even though we don't know for sure. - l.considerWriting() + // Start the worker goroutine. + go l.work() - l.lock.Unlock() + return l } -// Similar to Update(), but forces a write without respecting the -// MinimumWriteInterval. This is useful if you know that you're about -// to quit (e.g. if you discovered a fatal error, or you're finished), -// since go will not wait for timers (including the pending write -// timer) to go off before exiting. -func (l *Logger) ForceUpdate(mutator LogMutator) { - l.lock.Lock() - - mutator(l.properties, l.entry) - l.modified = true // We assume the mutator modified the log, even though we don't know for sure. - - l.write() - l.lock.Unlock() -} +// Exported functions will be called from other goroutines, therefore +// all they are allowed to do is enqueue work to be done in the worker +// goroutine. -// Adds a hook which will be called every time this logger writes an entry. -func (l *Logger) AddWriteHook(hook LogMutator) { - l.lock.Lock() - l.writeHooks = append(l.writeHooks, hook) - // TODO(misha): Consider setting modified and attempting a write. - l.lock.Unlock() +// Enqueues an update. This will happen in another goroutine after +// this method returns. +func (l *Logger) Update(mutator LogMutator) { + l.workToDo <- mutator } -// This function is called on a timer when we have something to write, -// but need to schedule the write for the future to respect -// MinimumWriteInterval. -func (l *Logger) acquireLockConsiderWriting() { - l.lock.Lock() +// Similar to Update(), but writes the log entry as soon as possible +// (ignoring MinimumWriteInterval) and blocks until the entry has been +// written. This is useful if you know that you're about to quit +// (e.g. if you discovered a fatal error, or you're finished), since +// go will not wait for timers (including the pending write timer) to +// go off before exiting. +func (l *Logger) FinalUpdate(mutator LogMutator) { + // TODO(misha): Consider not accepting any future updates somehow, + // since they won't get written if they come in after this. + + // Stop the periodic write ticker. We'll perform the final write + // before returning from this function. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeTicker.Stop() + } - // We are the scheduled write, so there are no longer future writes - // scheduled. - l.writeScheduled = false + // Apply the final update + l.workToDo <- mutator - l.considerWriting() + // Perform the final write and signal that we can return. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.write(true) + for i := 0; i < numberNoMoreWorkMessages; { + l.noMoreWork <- true + } + } - l.lock.Unlock() + // Wait until we've performed the write. + <-l.noMoreWork } -// The above methods each acquire the lock and release it. -// ======================================================= -// The below methods all assume we're holding a lock. - -// Check whether we have anything to write. If we do, then either -// write it now or later, based on what we're allowed. -func (l *Logger) considerWriting() { - if !l.modified { - // Nothing to write - } else if l.writeAllowedNow() { - l.write() - } else if l.writeScheduled { - // A future write is already scheduled, we don't need to do anything. - } else { - writeAfter := l.nextWriteAllowed.Sub(time.Now()) - time.AfterFunc(writeAfter, l.acquireLockConsiderWriting) - l.writeScheduled = true +// Adds a hook which will be called every time this logger writes an entry. +func (l *Logger) AddWriteHook(hook LogMutator) { + // We do the work in a LogMutator so that it happens in the worker + // goroutine. + l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { + l.writeHooks = append(l.writeHooks, hook) } } -// Whether writing now would respect MinimumWriteInterval -func (l *Logger) writeAllowedNow() bool { - return l.nextWriteAllowed.Before(time.Now()) +// The worker loop +func (l *Logger) work() { + for { + select { + case <-l.writeTicker.C: + if l.modified { + l.write(false) + l.modified = false + } + case mutator := <-l.workToDo: + mutator(l.properties, l.entry) + l.modified = true + case <-l.noMoreWork: + return + } + } } // Actually writes the log entry. -func (l *Logger) write() { +func (l *Logger) write(isFinal bool) { // Run all our hooks for _, hook := range l.writeHooks { hook(l.properties, l.entry) } - // Update the event type in case it was modified or is missing. - l.entry["event_type"] = l.params.EventType + // Update the event type. + if isFinal { + l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix + } else if l.hasWritten { + l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix + } else { + l.entry["event_type"] = l.params.EventTypePrefix + startSuffix + } + l.hasWritten = true // Write the log entry. + // This is a network write and will take a while, which is bad + // because we're blocking all the other work on this goroutine. + // + // TODO(misha): Consider rewriting this so that we can encode l.data + // into a string, and then perform the actual write in another + // routine. This will be tricky and will require support in the + // client. err := l.params.Client.Create("logs", l.data, nil) if err != nil { log.Printf("Attempted to log: %v", l.data) log.Fatalf("Received error writing log: %v", err) } - - // Update stats. - l.nextWriteAllowed = time.Now().Add(l.params.MinimumWriteInterval) - l.modified = false }