-// Periodically writes a log to the Arvados SDK.
+// Logger periodically writes a log to the Arvados SDK.
//
-// This package is useful for maintaining a log object that is built
-// up over time. Every time the object is modified, it will be written
-// to the log. Writes will be throttled to no more than one every
-// WriteFrequencySeconds
+// This package is useful for maintaining a log object that is updated
+// over time. This log object will be periodically written to the log,
+// as specified by WriteInterval in the Params.
//
// This package is safe for concurrent use as long as:
-// 1. The maps returned by Edit() are only edited in the same routine
-// that called Edit()
-// 2. Those maps not edited after calling Record()
-// An easy way to assure this is true is to place the call to Edit()
-// within a short block as shown below in the Usage Example:
+// The maps passed to a LogMutator are not accessed outside of the
+// LogMutator
//
// Usage:
// arvLogger := logger.NewLogger(params)
-// {
-// properties, entry := arvLogger.Edit() // This will block if others are using the logger
+// arvLogger.Update(func(properties map[string]interface{},
+// entry map[string]interface{}) {
// // Modifiy properties and entry however you want
// // properties is a shortcut for entry["properties"].(map[string]interface{})
-// // properties can take any values you want to give it,
-// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
-// }
-// arvLogger.Record() // This triggers the actual log write
-
+// // properties can take any (valid) values you want to give it,
+// // entry will only take the fields listed at
+// // http://doc.arvados.org/api/schema/Log.html
+// // Valid values for properties are anything that can be json
+// // encoded (i.e. will not error if you call json.Marshal() on it.
+// })
package logger
import (
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
- "sync"
"time"
)
+const (
+ startSuffix = "-start"
+ partialSuffix = "-partial"
+ finalSuffix = "-final"
+ numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
+)
+
type LoggerParams struct {
- Client arvadosclient.ArvadosClient // The client we use to write log entries
- EventType string // The event type to assign to the log entry.
- MinimumWriteInterval time.Duration // Wait at least this long between log writes
+ Client *arvadosclient.ArvadosClient // The client we use to write log entries
+ EventTypePrefix string // The prefix we use for the event type in the log entry
+ WriteInterval time.Duration // Wait at least this long between log writes
}
+// A LogMutator is a function which modifies the log entry.
+// It takes two maps as arguments, properties is the first and entry
+// is the second
+// properties is a shortcut for entry["properties"].(map[string]interface{})
+// properties can take any values you want to give it.
+// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+// properties and entry are only safe to access inside the LogMutator,
+// they should not be stored anywhere, otherwise you'll risk
+// concurrent access.
+type LogMutator func(map[string]interface{}, map[string]interface{})
+
// A Logger is used to build up a log entry over time and write every
// version of it.
type Logger struct {
- data map[string]interface{} // The entire map that we give to the api
- entry map[string]interface{} // convenience shortcut into data
- properties map[string]interface{} // convenience shortcut into data
- lock sync.Locker // Synchronizes editing and writing
- params LoggerParams // parameters we were given
+ // The data we write
+ data map[string]interface{} // The entire map that we give to the api
+ entry map[string]interface{} // Convenience shortcut into data
+ properties map[string]interface{} // Convenience shortcut into data
+
+ params LoggerParams // Parameters we were given
+
+ // Variables to coordinate updating and writing.
+ modified bool // Has this data been modified since the last write?
+ workToDo chan LogMutator // Work to do in the worker thread.
+ writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified.
+ hasWritten bool // Whether we've written at all yet.
+ noMoreWork chan bool // Signals that we're done writing.
+
+ writeHooks []LogMutator // Mutators we call before each write.
}
// Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) *Logger {
- // TODO(misha): Add some params checking here.
- l := &Logger{data: make(map[string]interface{}),
- lock: &sync.Mutex{},
- // TODO(misha): Consider copying the params so they're not
- // modified after creation.
- params: params}
- l.entry = make(map[string]interface{})
+func NewLogger(params LoggerParams) (l *Logger, err error) {
+ // sanity check parameters
+ if ¶ms.Client == nil {
+ err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
+ return
+ }
+ if params.EventTypePrefix == "" {
+ err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
+ return
+ }
+
+ l = &Logger{
+ data: make(map[string]interface{}),
+ entry: make(map[string]interface{}),
+ properties: make(map[string]interface{}),
+ params: params,
+ workToDo: make(chan LogMutator, 10),
+ writeTicker: time.NewTicker(params.WriteInterval),
+ noMoreWork: make(chan bool, numberNoMoreWorkMessages)}
+
l.data["log"] = l.entry
- l.properties = make(map[string]interface{})
l.entry["properties"] = l.properties
- return l
+
+ // Start the worker goroutine.
+ go l.work()
+
+ return l, nil
}
-// Get access to the maps you can edit. This will hold a lock until
-// you call Record. Do not edit the maps in any other goroutines or
-// after calling Record.
-// You don't need to edit both maps,
-// properties can take any values you want to give it,
-// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
-// properties is a shortcut for entry["properties"].(map[string]interface{})
-func (l *Logger) Edit() (properties map[string]interface{}, entry map[string]interface{}) {
- l.lock.Lock()
- return l.properties, l.entry
+// Exported functions will be called from other goroutines, therefore
+// all they are allowed to do is enqueue work to be done in the worker
+// goroutine.
+
+// Enqueues an update. This will happen in another goroutine after
+// this method returns.
+func (l *Logger) Update(mutator LogMutator) {
+ l.workToDo <- mutator
}
-// Write the log entry you've built up so far. Do not edit the maps returned by Edit() after calling this method.
-func (l *Logger) Record() {
- // TODO(misha): Add a check (and storage) to make sure we respect MinimumWriteInterval
- l.write()
- l.lock.Unlock()
+// Similar to Update(), but writes the log entry as soon as possible
+// (ignoring MinimumWriteInterval) and blocks until the entry has been
+// written. This is useful if you know that you're about to quit
+// (e.g. if you discovered a fatal error, or you're finished), since
+// go will not wait for timers (including the pending write timer) to
+// go off before exiting.
+func (l *Logger) FinalUpdate(mutator LogMutator) {
+ // TODO(misha): Consider not accepting any future updates somehow,
+ // since they won't get written if they come in after this.
+
+ // Stop the periodic write ticker. We'll perform the final write
+ // before returning from this function.
+ l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+ l.writeTicker.Stop()
+ }
+
+ // Apply the final update
+ l.workToDo <- mutator
+
+ // Perform the final write and signal that we can return.
+ l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+ l.write(true)
+ for i := 0; i < numberNoMoreWorkMessages; {
+ l.noMoreWork <- true
+ }
+ }
+
+ // Wait until we've performed the write.
+ <-l.noMoreWork
+}
+
+// Adds a hook which will be called every time this logger writes an entry.
+func (l *Logger) AddWriteHook(hook LogMutator) {
+ // We do the work in a LogMutator so that it happens in the worker
+ // goroutine.
+ l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+ l.writeHooks = append(l.writeHooks, hook)
+ }
+}
+
+// The worker loop
+func (l *Logger) work() {
+ for {
+ select {
+ case <-l.writeTicker.C:
+ if l.modified {
+ l.write(false)
+ l.modified = false
+ }
+ case mutator := <-l.workToDo:
+ mutator(l.properties, l.entry)
+ l.modified = true
+ case <-l.noMoreWork:
+ return
+ }
+ }
}
// Actually writes the log entry.
-func (l *Logger) write() {
- // Update the event type in case it was modified or is missing.
- l.entry["event_type"] = l.params.EventType
+func (l *Logger) write(isFinal bool) {
+
+ // Run all our hooks
+ for _, hook := range l.writeHooks {
+ hook(l.properties, l.entry)
+ }
+
+ // Update the event type.
+ if isFinal {
+ l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
+ } else if l.hasWritten {
+ l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
+ } else {
+ l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
+ }
+ l.hasWritten = true
+
+ // Write the log entry.
+ // This is a network write and will take a while, which is bad
+ // because we're blocking all the other work on this goroutine.
+ //
+ // TODO(misha): Consider rewriting this so that we can encode l.data
+ // into a string, and then perform the actual write in another
+ // routine. This will be tricky and will require support in the
+ // client.
err := l.params.Client.Create("logs", l.data, nil)
if err != nil {
- log.Printf("Attempted to log: %v", l.data)
- log.Fatalf("Received error writing log: %v", err)
+ log.Printf("Received error writing %v: %v", l.data, err)
}
}