// entry map[string]interface{}) {
// // Modifiy properties and entry however you want
// // properties is a shortcut for entry["properties"].(map[string]interface{})
-// // properties can take any values you want to give it,
-// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+// // properties can take any (valid) values you want to give it,
+// // entry will only take the fields listed at
+// // http://doc.arvados.org/api/schema/Log.html
+// // Valid values for properties are anything that can be json
+// // encoded (i.e. will not error if you call json.Marshal() on it.
// })
package logger
)
const (
- startSuffix = "-start"
- partialSuffix = "-partial"
- finalSuffix = "-final"
+ startSuffix = "-start"
+ partialSuffix = "-partial"
+ finalSuffix = "-final"
+ numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
)
type LoggerParams struct {
workToDo chan LogMutator // Work to do in the worker thread.
writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified.
hasWritten bool // Whether we've written at all yet.
+ noMoreWork chan bool // Signals that we're done writing.
writeHooks []LogMutator // Mutators we call before each write.
}
}
l := &Logger{
- data: make(map[string]interface{}),
- entry: make(map[string]interface{}),
- properties: make(map[string]interface{}),
- params: params,
- workToDo: make(chan LogMutator, 10),
- writeTicker: time.NewTicker(params.WriteInterval)}
+ data: make(map[string]interface{}),
+ entry: make(map[string]interface{}),
+ properties: make(map[string]interface{}),
+ params: params,
+ workToDo: make(chan LogMutator, 10),
+ writeTicker: time.NewTicker(params.WriteInterval),
+ noMoreWork: make(chan bool, numberNoMoreWorkMessages)}
l.data["log"] = l.entry
l.entry["properties"] = l.properties
// go will not wait for timers (including the pending write timer) to
// go off before exiting.
func (l *Logger) FinalUpdate(mutator LogMutator) {
- // Block on this channel until everything finishes
- done := make(chan bool)
-
// TODO(misha): Consider not accepting any future updates somehow,
// since they won't get written if they come in after this.
// Perform the final write and signal that we can return.
l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
l.write(true)
- done <- true
+ for i := 0; i < numberNoMoreWorkMessages; {
+ l.noMoreWork <- true
+ }
}
// Wait until we've performed the write.
- <-done
+ <-l.noMoreWork
}
// Adds a hook which will be called every time this logger writes an entry.
case mutator := <-l.workToDo:
mutator(l.properties, l.entry)
l.modified = true
+ case <-l.noMoreWork:
+ return
}
}
}