1 // Logger periodically writes a log to the Arvados SDK.
3 // This package is useful for maintaining a log object that is updated
4 // over time. This log object will be periodically written to the log,
5 // as specified by WriteInterval in the Params.
7 // This package is safe for concurrent use as long as:
8 // The maps passed to a LogMutator are not accessed outside of the
12 // arvLogger := logger.NewLogger(params)
13 // arvLogger.Update(func(properties map[string]interface{},
14 // entry map[string]interface{}) {
15 // // Modifiy properties and entry however you want
16 // // properties is a shortcut for entry["properties"].(map[string]interface{})
17 // // properties can take any (valid) values you want to give it,
18 // // entry will only take the fields listed at
19 // // http://doc.arvados.org/api/schema/Log.html
20 // // Valid values for properties are anything that can be json
21 // // encoded (i.e. will not error if you call json.Marshal() on it.
26 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
32 startSuffix = "-start"
33 partialSuffix = "-partial"
34 finalSuffix = "-final"
35 numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
38 type LoggerParams struct {
39 Client arvadosclient.ArvadosClient // The client we use to write log entries
40 EventTypePrefix string // The prefix we use for the event type in the log entry
41 WriteInterval time.Duration // Wait at least this long between log writes
44 // A LogMutator is a function which modifies the log entry.
45 // It takes two maps as arguments, properties is the first and entry
47 // properties is a shortcut for entry["properties"].(map[string]interface{})
48 // properties can take any values you want to give it.
49 // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
50 // properties and entry are only safe to access inside the LogMutator,
51 // they should not be stored anywhere, otherwise you'll risk
53 type LogMutator func(map[string]interface{}, map[string]interface{})
55 // A Logger is used to build up a log entry over time and write every
59 data map[string]interface{} // The entire map that we give to the api
60 entry map[string]interface{} // Convenience shortcut into data
61 properties map[string]interface{} // Convenience shortcut into data
63 params LoggerParams // Parameters we were given
65 // Variables to coordinate updating and writing.
66 modified bool // Has this data been modified since the last write?
67 workToDo chan LogMutator // Work to do in the worker thread.
68 writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified.
69 hasWritten bool // Whether we've written at all yet.
70 noMoreWork chan bool // Signals that we're done writing.
72 writeHooks []LogMutator // Mutators we call before each write.
75 // Create a new logger based on the specified parameters.
76 func NewLogger(params LoggerParams) *Logger {
77 // sanity check parameters
78 if ¶ms.Client == nil {
79 log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
81 if params.EventTypePrefix == "" {
82 log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()")
86 data: make(map[string]interface{}),
87 entry: make(map[string]interface{}),
88 properties: make(map[string]interface{}),
90 workToDo: make(chan LogMutator, 10),
91 writeTicker: time.NewTicker(params.WriteInterval),
92 noMoreWork: make(chan bool, numberNoMoreWorkMessages)}
94 l.data["log"] = l.entry
95 l.entry["properties"] = l.properties
97 // Start the worker goroutine.
103 // Exported functions will be called from other goroutines, therefore
104 // all they are allowed to do is enqueue work to be done in the worker
107 // Enqueues an update. This will happen in another goroutine after
108 // this method returns.
109 func (l *Logger) Update(mutator LogMutator) {
110 l.workToDo <- mutator
113 // Similar to Update(), but writes the log entry as soon as possible
114 // (ignoring MinimumWriteInterval) and blocks until the entry has been
115 // written. This is useful if you know that you're about to quit
116 // (e.g. if you discovered a fatal error, or you're finished), since
117 // go will not wait for timers (including the pending write timer) to
118 // go off before exiting.
119 func (l *Logger) FinalUpdate(mutator LogMutator) {
120 // TODO(misha): Consider not accepting any future updates somehow,
121 // since they won't get written if they come in after this.
123 // Stop the periodic write ticker. We'll perform the final write
124 // before returning from this function.
125 l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
129 // Apply the final update
130 l.workToDo <- mutator
132 // Perform the final write and signal that we can return.
133 l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
135 for i := 0; i < numberNoMoreWorkMessages; {
140 // Wait until we've performed the write.
144 // Adds a hook which will be called every time this logger writes an entry.
145 func (l *Logger) AddWriteHook(hook LogMutator) {
146 // We do the work in a LogMutator so that it happens in the worker
148 l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
149 l.writeHooks = append(l.writeHooks, hook)
154 func (l *Logger) work() {
157 case <-l.writeTicker.C:
162 case mutator := <-l.workToDo:
163 mutator(l.properties, l.entry)
171 // Actually writes the log entry.
172 func (l *Logger) write(isFinal bool) {
175 for _, hook := range l.writeHooks {
176 hook(l.properties, l.entry)
179 // Update the event type.
181 l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
182 } else if l.hasWritten {
183 l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
185 l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
189 // Write the log entry.
190 // This is a network write and will take a while, which is bad
191 // because we're blocking all the other work on this goroutine.
193 // TODO(misha): Consider rewriting this so that we can encode l.data
194 // into a string, and then perform the actual write in another
195 // routine. This will be tricky and will require support in the
197 err := l.params.Client.Create("logs", l.data, nil)
199 log.Printf("Attempted to log: %v", l.data)
200 log.Fatalf("Received error writing log: %v", err)