Merge branch 'master' into 3408-production-datamanager
[arvados.git] / sdk / go / logger / logger.go
index e6e1ed64df3a17988c236231147c961c9282c261..a989afcf26cb7a6d7b7bad6e4b1589d30e06520d 100644 (file)
@@ -1,9 +1,8 @@
 // Logger periodically writes a log to the Arvados SDK.
 //
 // This package is useful for maintaining a log object that is updated
-// over time. Every time the object is updated, it will be written to
-// the log. Writes will be throttled to no more frequent than
-// WriteInterval.
+// over time. This log object will be periodically written to the log,
+// as specified by WriteInterval in the Params.
 //
 // This package is safe for concurrent use as long as:
 // The maps passed to a LogMutator are not accessed outside of the
 //     entry map[string]interface{}) {
 //   // Modifiy properties and entry however you want
 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
-//   // properties can take any values you want to give it,
-//   // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+//   // properties can take any (valid) values you want to give it,
+//   // entry will only take the fields listed at
+//   // http://doc.arvados.org/api/schema/Log.html
+//   // Valid values for properties are anything that can be json
+//   // encoded (i.e. will not error if you call json.Marshal() on it.
 // })
 package logger
 
@@ -26,10 +28,17 @@ import (
        "time"
 )
 
+const (
+       startSuffix              = "-start"
+       partialSuffix            = "-partial"
+       finalSuffix              = "-final"
+       numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
+)
+
 type LoggerParams struct {
-       Client        arvadosclient.ArvadosClient // The client we use to write log entries
-       EventType     string                      // The event type to assign to the log entry.
-       WriteInterval time.Duration               // Wait at least this long between log writes
+       Client          arvadosclient.ArvadosClient // The client we use to write log entries
+       EventTypePrefix string                      // The prefix we use for the event type in the log entry
+       WriteInterval   time.Duration               // Wait at least this long between log writes
 }
 
 // A LogMutator is a function which modifies the log entry.
@@ -57,6 +66,8 @@ type Logger struct {
        modified    bool            // Has this data been modified since the last write?
        workToDo    chan LogMutator // Work to do in the worker thread.
        writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
+       hasWritten  bool            // Whether we've written at all yet.
+       noMoreWork  chan bool       // Signals that we're done writing.
 
        writeHooks []LogMutator // Mutators we call before each write.
 }
@@ -67,20 +78,22 @@ func NewLogger(params LoggerParams) *Logger {
        if &params.Client == nil {
                log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
        }
-       if params.EventType == "" {
-               log.Fatal("Empty event type in LoggerParams passed in to NewLogger()")
+       if params.EventTypePrefix == "" {
+               log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()")
        }
 
-       l := &Logger{data: make(map[string]interface{}),
-               params: params}
-       l.entry = make(map[string]interface{})
+       l := &Logger{
+               data:        make(map[string]interface{}),
+               entry:       make(map[string]interface{}),
+               properties:  make(map[string]interface{}),
+               params:      params,
+               workToDo:    make(chan LogMutator, 10),
+               writeTicker: time.NewTicker(params.WriteInterval),
+               noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
+
        l.data["log"] = l.entry
-       l.properties = make(map[string]interface{})
        l.entry["properties"] = l.properties
 
-       l.workToDo = make(chan LogMutator, 10)
-       l.writeTicker = time.NewTicker(params.WriteInterval)
-
        // Start the worker goroutine.
        go l.work()
 
@@ -104,9 +117,6 @@ func (l *Logger) Update(mutator LogMutator) {
 // go will not wait for timers (including the pending write timer) to
 // go off before exiting.
 func (l *Logger) FinalUpdate(mutator LogMutator) {
-       // Block on this channel until everything finishes
-       done := make(chan bool)
-
        // TODO(misha): Consider not accepting any future updates somehow,
        // since they won't get written if they come in after this.
 
@@ -119,16 +129,16 @@ func (l *Logger) FinalUpdate(mutator LogMutator) {
        // Apply the final update
        l.workToDo <- mutator
 
-       // Perform the write and signal that we can return.
+       // Perform the final write and signal that we can return.
        l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               // TODO(misha): Add a boolean arg to write() to indicate that it's
-               // final so that we can set the appropriate event type.
-               l.write()
-               done <- true
+               l.write(true)
+               for i := 0; i < numberNoMoreWorkMessages; {
+                       l.noMoreWork <- true
+               }
        }
 
        // Wait until we've performed the write.
-       <-done
+       <-l.noMoreWork
 }
 
 // Adds a hook which will be called every time this logger writes an entry.
@@ -146,27 +156,35 @@ func (l *Logger) work() {
                select {
                case <-l.writeTicker.C:
                        if l.modified {
-                               l.write()
+                               l.write(false)
                                l.modified = false
                        }
                case mutator := <-l.workToDo:
                        mutator(l.properties, l.entry)
                        l.modified = true
+               case <-l.noMoreWork:
+                       return
                }
        }
 }
 
 // Actually writes the log entry.
-func (l *Logger) write() {
+func (l *Logger) write(isFinal bool) {
 
        // Run all our hooks
        for _, hook := range l.writeHooks {
                hook(l.properties, l.entry)
        }
 
-       // Update the event type in case it was modified or is missing.
-       // TODO(misha): Fix this to write different event types.
-       l.entry["event_type"] = l.params.EventType
+       // Update the event type.
+       if isFinal {
+               l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
+       } else if l.hasWritten {
+               l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
+       } else {
+               l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
+       }
+       l.hasWritten = true
 
        // Write the log entry.
        // This is a network write and will take a while, which is bad