8460: Move loggedDuration from keepstore to sdk pkg as stats.Duration.
[arvados.git] / sdk / go / logger / logger.go
index 6835750812056e797b9ab01804c4e2833b85d071..6dd7fb3723ca6fa78f29141b754e52448055a15e 100644 (file)
-// Periodically writes a log to the Arvados SDK.
+// Logger periodically writes a log to the Arvados SDK.
 //
-// This package is useful for maintaining a log object that is built
-// up over time. Every time the object is modified, it will be written
-// to the log. Writes will be throttled to no more than one every
-// WriteFrequencySeconds
+// This package is useful for maintaining a log object that is updated
+// over time. This log object will be periodically written to the log,
+// as specified by WriteInterval in the Params.
 //
-// This package is safe for concurrent use.
+// This package is safe for concurrent use as long as:
+// The maps passed to a LogMutator are not accessed outside of the
+// LogMutator
 //
 // Usage:
 // arvLogger := logger.NewLogger(params)
-// logData := arvLogger.Acquire()  // This will block if others are using the logger
-// // Modify the logObject however you want here ..
-// logData = arvLogger.Release()  // This triggers the actual write, and replaces logObject with a nil pointer so you don't try to modify it when you're no longer holding the lock
-
+// arvLogger.Update(func(properties map[string]interface{},
+//     entry map[string]interface{}) {
+//   // Modifiy properties and entry however you want
+//   // properties is a shortcut for entry["properties"].(map[string]interface{})
+//   // properties can take any (valid) values you want to give it,
+//   // entry will only take the fields listed at
+//   // http://doc.arvados.org/api/schema/Log.html
+//   // Valid values for properties are anything that can be json
+//   // encoded (i.e. will not error if you call json.Marshal() on it.
+// })
 package logger
 
 import (
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
-       "sync"
        "time"
 )
 
 const (
-       eventTypeLabel string = "event-type"
-       propertiesLabel string = "properties"
+       startSuffix              = "-start"
+       partialSuffix            = "-partial"
+       finalSuffix              = "-final"
+       numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
 )
 
 type LoggerParams struct {
-       Client arvadosclient.ArvadosClient  // The client we use to write log entries
-       EventType string  // The event type to assign to the log entry.
-       MinimumWriteInterval time.Duration  // Wait at least this long between log writes
+       Client          *arvadosclient.ArvadosClient // The client we use to write log entries
+       EventTypePrefix string                       // The prefix we use for the event type in the log entry
+       WriteInterval   time.Duration                // Wait at least this long between log writes
 }
 
+// A LogMutator is a function which modifies the log entry.
+// It takes two maps as arguments, properties is the first and entry
+// is the second
+// properties is a shortcut for entry["properties"].(map[string]interface{})
+// properties can take any values you want to give it.
+// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+// properties and entry are only safe to access inside the LogMutator,
+// they should not be stored anywhere, otherwise you'll risk
+// concurrent access.
+type LogMutator func(map[string]interface{}, map[string]interface{})
+
+// A Logger is used to build up a log entry over time and write every
+// version of it.
 type Logger struct {
-       data map[string]interface{}
-       lock sync.Locker
-       params LoggerParams
+       // The data we write
+       data       map[string]interface{} // The entire map that we give to the api
+       entry      map[string]interface{} // Convenience shortcut into data
+       properties map[string]interface{} // Convenience shortcut into data
+
+       params LoggerParams // Parameters we were given
+
+       // Variables to coordinate updating and writing.
+       modified    bool            // Has this data been modified since the last write?
+       workToDo    chan LogMutator // Work to do in the worker thread.
+       writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
+       hasWritten  bool            // Whether we've written at all yet.
+       noMoreWork  chan bool       // Signals that we're done writing.
+
+       writeHooks []LogMutator // Mutators we call before each write.
 }
 
-func NewLogger(params LoggerParams) *Logger {
-       l := &Logger{data: make(map[string]interface{}),
-               lock: &sync.Mutex{},
-               // TODO(misha): Consider copying the params so they're not
-               // modified after creation.
-               params: params}
-       l.data[propertiesLabel] = make(map[string]interface{})
-       return l
+// Create a new logger based on the specified parameters.
+func NewLogger(params LoggerParams) (l *Logger, err error) {
+       // sanity check parameters
+       if &params.Client == nil {
+               err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
+               return
+       }
+       if params.EventTypePrefix == "" {
+               err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
+               return
+       }
+
+       l = &Logger{
+               data:        make(map[string]interface{}),
+               entry:       make(map[string]interface{}),
+               properties:  make(map[string]interface{}),
+               params:      params,
+               workToDo:    make(chan LogMutator, 10),
+               writeTicker: time.NewTicker(params.WriteInterval),
+               noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
+
+       l.data["log"] = l.entry
+       l.entry["properties"] = l.properties
+
+       // Start the worker goroutine.
+       go l.work()
+
+       return l, nil
 }
 
-func (l *Logger) Acquire() map[string]interface{} {
-       l.lock.Lock()
-       return l.data[propertiesLabel].(map[string]interface{})
+// Exported functions will be called from other goroutines, therefore
+// all they are allowed to do is enqueue work to be done in the worker
+// goroutine.
+
+// Enqueues an update. This will happen in another goroutine after
+// this method returns.
+func (l *Logger) Update(mutator LogMutator) {
+       l.workToDo <- mutator
 }
 
-func (l *Logger) Release() map[string]interface{} {
-       // TODO(misha): Add a check (and storage) to make sure we respect MinimumWriteInterval
-       l.write()
-       l.lock.Unlock()
-       return nil
+// Similar to Update(), but writes the log entry as soon as possible
+// (ignoring MinimumWriteInterval) and blocks until the entry has been
+// written. This is useful if you know that you're about to quit
+// (e.g. if you discovered a fatal error, or you're finished), since
+// go will not wait for timers (including the pending write timer) to
+// go off before exiting.
+func (l *Logger) FinalUpdate(mutator LogMutator) {
+       // TODO(misha): Consider not accepting any future updates somehow,
+       // since they won't get written if they come in after this.
+
+       // Stop the periodic write ticker. We'll perform the final write
+       // before returning from this function.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               l.writeTicker.Stop()
+       }
+
+       // Apply the final update
+       l.workToDo <- mutator
+
+       // Perform the final write and signal that we can return.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               l.write(true)
+               for i := 0; i < numberNoMoreWorkMessages; {
+                       l.noMoreWork <- true
+               }
+       }
+
+       // Wait until we've performed the write.
+       <-l.noMoreWork
+}
+
+// Adds a hook which will be called every time this logger writes an entry.
+func (l *Logger) AddWriteHook(hook LogMutator) {
+       // We do the work in a LogMutator so that it happens in the worker
+       // goroutine.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               l.writeHooks = append(l.writeHooks, hook)
+       }
+}
+
+// The worker loop
+func (l *Logger) work() {
+       for {
+               select {
+               case <-l.writeTicker.C:
+                       if l.modified {
+                               l.write(false)
+                               l.modified = false
+                       }
+               case mutator := <-l.workToDo:
+                       mutator(l.properties, l.entry)
+                       l.modified = true
+               case <-l.noMoreWork:
+                       return
+               }
+       }
 }
 
-func (l *Logger) write() {
-       // Update the event type in case it was modified or is missing.
-       // l.data[eventTypeLabel] = l.params.EventType
-       // m := make(map[string]interface{})
-       // m["body"] = l.data
-       // //err := l.params.Client.Create("logs", l.data, nil)
-       // //err := l.params.Client.Create("logs", m, nil)
-       // var results map[string]interface{}
-       err := l.params.Client.Create("logs",
-               arvadosclient.Dict{"log": arvadosclient.Dict{
-                       eventTypeLabel: l.params.EventType,
-                       propertiesLabel: l.data}}, nil)
+// Actually writes the log entry.
+func (l *Logger) write(isFinal bool) {
+
+       // Run all our hooks
+       for _, hook := range l.writeHooks {
+               hook(l.properties, l.entry)
+       }
+
+       // Update the event type.
+       if isFinal {
+               l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
+       } else if l.hasWritten {
+               l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
+       } else {
+               l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
+       }
+       l.hasWritten = true
+
+       // Write the log entry.
+       // This is a network write and will take a while, which is bad
+       // because we're blocking all the other work on this goroutine.
+       //
+       // TODO(misha): Consider rewriting this so that we can encode l.data
+       // into a string, and then perform the actual write in another
+       // routine. This will be tricky and will require support in the
+       // client.
+       err := l.params.Client.Create("logs", l.data, nil)
        if err != nil {
-               log.Fatalf("Received error writing log: %v", err)
+               log.Printf("Received error writing %v: %v", l.data, err)
        }
 }