X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8a504ad561c1ffbafee8a7bc8da551f9d4b9a29e..refs/heads/9374-go-sdk:/sdk/go/logger/logger.go diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go index e7bc8d14c3..3b2db3a321 100644 --- a/sdk/go/logger/logger.go +++ b/sdk/go/logger/logger.go @@ -14,21 +14,26 @@ // entry map[string]interface{}) { // // Modifiy properties and entry however you want // // properties is a shortcut for entry["properties"].(map[string]interface{}) -// // properties can take any values you want to give it, -// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html +// // properties can take any (valid) values you want to give it, +// // entry will only take the fields listed at +// // http://doc.arvados.org/api/schema/Log.html +// // Valid values for properties are anything that can be json +// // encoded (i.e. will not error if you call json.Marshal() on it. // }) package logger import ( + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" "time" ) const ( - startSuffix = "-start" - partialSuffix = "-partial" - finalSuffix = "-final" + startSuffix = "-start" + partialSuffix = "-partial" + finalSuffix = "-final" + numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work(). ) type LoggerParams struct { @@ -63,34 +68,39 @@ type Logger struct { workToDo chan LogMutator // Work to do in the worker thread. writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified. hasWritten bool // Whether we've written at all yet. + noMoreWork chan bool // Signals that we're done writing. writeHooks []LogMutator // Mutators we call before each write. } // Create a new logger based on the specified parameters. -func NewLogger(params LoggerParams) *Logger { +func NewLogger(params LoggerParams) (l *Logger, err error) { // sanity check parameters if ¶ms.Client == nil { - log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()") + err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()") + return } if params.EventTypePrefix == "" { - log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()") + err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()") + return } - l := &Logger{data: make(map[string]interface{}), - params: params} - l.entry = make(map[string]interface{}) + l = &Logger{ + data: make(map[string]interface{}), + entry: make(map[string]interface{}), + properties: make(map[string]interface{}), + params: params, + workToDo: make(chan LogMutator, 10), + writeTicker: time.NewTicker(params.WriteInterval), + noMoreWork: make(chan bool, numberNoMoreWorkMessages)} + l.data["log"] = l.entry - l.properties = make(map[string]interface{}) l.entry["properties"] = l.properties - l.workToDo = make(chan LogMutator, 10) - l.writeTicker = time.NewTicker(params.WriteInterval) - // Start the worker goroutine. go l.work() - return l + return l, nil } // Exported functions will be called from other goroutines, therefore @@ -110,9 +120,6 @@ func (l *Logger) Update(mutator LogMutator) { // go will not wait for timers (including the pending write timer) to // go off before exiting. func (l *Logger) FinalUpdate(mutator LogMutator) { - // Block on this channel until everything finishes - done := make(chan bool) - // TODO(misha): Consider not accepting any future updates somehow, // since they won't get written if they come in after this. @@ -128,11 +135,13 @@ func (l *Logger) FinalUpdate(mutator LogMutator) { // Perform the final write and signal that we can return. l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) { l.write(true) - done <- true + for i := 0; i < numberNoMoreWorkMessages; { + l.noMoreWork <- true + } } // Wait until we've performed the write. - <-done + <-l.noMoreWork } // Adds a hook which will be called every time this logger writes an entry. @@ -156,6 +165,8 @@ func (l *Logger) work() { case mutator := <-l.workToDo: mutator(l.properties, l.entry) l.modified = true + case <-l.noMoreWork: + return } } } @@ -188,7 +199,6 @@ func (l *Logger) write(isFinal bool) { // client. err := l.params.Client.Create("logs", l.data, nil) if err != nil { - log.Printf("Attempted to log: %v", l.data) - log.Fatalf("Received error writing log: %v", err) + log.Printf("Received error writing %v: %v", l.data, err) } }