// Logger periodically writes a log to the Arvados SDK.
//
// This package is useful for maintaining a log object that is updated
-// over time. Every time the object is updated, it will be written to
-// the log. Writes will be throttled to no more frequent than
-// WriteInterval.
+// over time. This log object will be periodically written to the log,
+// as specified by WriteInterval in the Params.
//
// This package is safe for concurrent use as long as:
// The maps passed to a LogMutator are not accessed outside of the
// entry map[string]interface{}) {
// // Modifiy properties and entry however you want
// // properties is a shortcut for entry["properties"].(map[string]interface{})
-// // properties can take any values you want to give it,
-// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+// // properties can take any (valid) values you want to give it,
+// // entry will only take the fields listed at
+// // http://doc.arvados.org/api/schema/Log.html
+// // Valid values for properties are anything that can be json
+// // encoded (i.e. will not error if you call json.Marshal() on it.
// })
package logger
import (
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
"time"
)
+const (
+ startSuffix = "-start"
+ partialSuffix = "-partial"
+ finalSuffix = "-final"
+ numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
+)
+
type LoggerParams struct {
- Client arvadosclient.ArvadosClient // The client we use to write log entries
- EventType string // The event type to assign to the log entry.
- WriteInterval time.Duration // Wait at least this long between log writes
+ Client *arvadosclient.ArvadosClient // The client we use to write log entries
+ EventTypePrefix string // The prefix we use for the event type in the log entry
+ WriteInterval time.Duration // Wait at least this long between log writes
}
// A LogMutator is a function which modifies the log entry.
modified bool // Has this data been modified since the last write?
workToDo chan LogMutator // Work to do in the worker thread.
writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified.
+ hasWritten bool // Whether we've written at all yet.
+ noMoreWork chan bool // Signals that we're done writing.
writeHooks []LogMutator // Mutators we call before each write.
}
// Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) *Logger {
+func NewLogger(params LoggerParams) (l *Logger, err error) {
// sanity check parameters
if ¶ms.Client == nil {
- log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
+ err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
+ return
}
- if params.EventType == "" {
- log.Fatal("Empty event type in LoggerParams passed in to NewLogger()")
+ if params.EventTypePrefix == "" {
+ err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
+ return
}
- l := &Logger{data: make(map[string]interface{}),
- params: params}
- l.entry = make(map[string]interface{})
+ l = &Logger{
+ data: make(map[string]interface{}),
+ entry: make(map[string]interface{}),
+ properties: make(map[string]interface{}),
+ params: params,
+ workToDo: make(chan LogMutator, 10),
+ writeTicker: time.NewTicker(params.WriteInterval),
+ noMoreWork: make(chan bool, numberNoMoreWorkMessages)}
+
l.data["log"] = l.entry
- l.properties = make(map[string]interface{})
l.entry["properties"] = l.properties
- l.workToDo = make(chan LogMutator, 10)
- l.writeTicker = time.NewTicker(params.WriteInterval)
-
// Start the worker goroutine.
go l.work()
- return l
+ return l, nil
}
// Exported functions will be called from other goroutines, therefore
// go will not wait for timers (including the pending write timer) to
// go off before exiting.
func (l *Logger) FinalUpdate(mutator LogMutator) {
- // Block on this channel until everything finishes
- done := make(chan bool)
-
// TODO(misha): Consider not accepting any future updates somehow,
// since they won't get written if they come in after this.
// Apply the final update
l.workToDo <- mutator
- // Perform the write and signal that we can return.
+ // Perform the final write and signal that we can return.
l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
- // TODO(misha): Add a boolean arg to write() to indicate that it's
- // final so that we can set the appropriate event type.
- l.write()
- done <- true
+ l.write(true)
+ for i := 0; i < numberNoMoreWorkMessages; {
+ l.noMoreWork <- true
+ }
}
// Wait until we've performed the write.
- <-done
+ <-l.noMoreWork
}
// Adds a hook which will be called every time this logger writes an entry.
select {
case <-l.writeTicker.C:
if l.modified {
- l.write()
+ l.write(false)
l.modified = false
}
case mutator := <-l.workToDo:
mutator(l.properties, l.entry)
l.modified = true
+ case <-l.noMoreWork:
+ return
}
}
}
// Actually writes the log entry.
-func (l *Logger) write() {
+func (l *Logger) write(isFinal bool) {
// Run all our hooks
for _, hook := range l.writeHooks {
hook(l.properties, l.entry)
}
- // Update the event type in case it was modified or is missing.
- // TODO(misha): Fix this to write different event types.
- l.entry["event_type"] = l.params.EventType
+ // Update the event type.
+ if isFinal {
+ l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
+ } else if l.hasWritten {
+ l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
+ } else {
+ l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
+ }
+ l.hasWritten = true
// Write the log entry.
// This is a network write and will take a while, which is bad
// client.
err := l.params.Client.Create("logs", l.data, nil)
if err != nil {
- log.Printf("Attempted to log: %v", l.data)
- log.Fatalf("Received error writing log: %v", err)
+ log.Printf("Received error writing %v: %v", l.data, err)
}
}