7710: update crunchrunner.upload.go to compile; this was broken after the keep-web...
[arvados.git] / sdk / go / logger / logger.go
index 361e34f9615072007678292669c48c1d910a1d86..3b2db3a321271495a3965759363e2d723ea9e082 100644 (file)
 //     entry map[string]interface{}) {
 //   // Modifiy properties and entry however you want
 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
-//   // properties can take any values you want to give it,
-//   // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+//   // properties can take any (valid) values you want to give it,
+//   // entry will only take the fields listed at
+//   // http://doc.arvados.org/api/schema/Log.html
+//   // Valid values for properties are anything that can be json
+//   // encoded (i.e. will not error if you call json.Marshal() on it.
 // })
 package logger
 
 import (
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
        "time"
 )
 
 const (
-       startSuffix   = "-start"
-       partialSuffix = "-partial"
-       finalSuffix   = "-final"
+       startSuffix              = "-start"
+       partialSuffix            = "-partial"
+       finalSuffix              = "-final"
+       numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
 )
 
 type LoggerParams struct {
@@ -63,27 +68,31 @@ type Logger struct {
        workToDo    chan LogMutator // Work to do in the worker thread.
        writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
        hasWritten  bool            // Whether we've written at all yet.
+       noMoreWork  chan bool       // Signals that we're done writing.
 
        writeHooks []LogMutator // Mutators we call before each write.
 }
 
 // Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) *Logger {
+func NewLogger(params LoggerParams) (l *Logger, err error) {
        // sanity check parameters
        if &params.Client == nil {
-               log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
+               err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
+               return
        }
        if params.EventTypePrefix == "" {
-               log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()")
+               err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
+               return
        }
 
-       l := &Logger{
-               data: make(map[string]interface{}),
-               entry: make(map[string]interface{}),
-               properties: make(map[string]interface{}),
-               params: params,
-               workToDo: make(chan LogMutator, 10),
-               writeTicker: time.NewTicker(params.WriteInterval)}
+       l = &Logger{
+               data:        make(map[string]interface{}),
+               entry:       make(map[string]interface{}),
+               properties:  make(map[string]interface{}),
+               params:      params,
+               workToDo:    make(chan LogMutator, 10),
+               writeTicker: time.NewTicker(params.WriteInterval),
+               noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
 
        l.data["log"] = l.entry
        l.entry["properties"] = l.properties
@@ -91,7 +100,7 @@ func NewLogger(params LoggerParams) *Logger {
        // Start the worker goroutine.
        go l.work()
 
-       return l
+       return l, nil
 }
 
 // Exported functions will be called from other goroutines, therefore
@@ -111,9 +120,6 @@ func (l *Logger) Update(mutator LogMutator) {
 // go will not wait for timers (including the pending write timer) to
 // go off before exiting.
 func (l *Logger) FinalUpdate(mutator LogMutator) {
-       // Block on this channel until everything finishes
-       done := make(chan bool)
-
        // TODO(misha): Consider not accepting any future updates somehow,
        // since they won't get written if they come in after this.
 
@@ -129,11 +135,13 @@ func (l *Logger) FinalUpdate(mutator LogMutator) {
        // Perform the final write and signal that we can return.
        l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
                l.write(true)
-               done <- true
+               for i := 0; i < numberNoMoreWorkMessages; {
+                       l.noMoreWork <- true
+               }
        }
 
        // Wait until we've performed the write.
-       <-done
+       <-l.noMoreWork
 }
 
 // Adds a hook which will be called every time this logger writes an entry.
@@ -157,6 +165,8 @@ func (l *Logger) work() {
                case mutator := <-l.workToDo:
                        mutator(l.properties, l.entry)
                        l.modified = true
+               case <-l.noMoreWork:
+                       return
                }
        }
 }
@@ -189,7 +199,6 @@ func (l *Logger) write(isFinal bool) {
        // client.
        err := l.params.Client.Create("logs", l.data, nil)
        if err != nil {
-               log.Printf("Attempted to log: %v", l.data)
-               log.Fatalf("Received error writing log: %v", err)
+               log.Printf("Received error writing %v: %v", l.data, err)
        }
 }