Updated logger to do all work in a dedicated goroutine, so we don't need to worry...
[arvados.git] / sdk / go / logger / logger.go
1 // Logger periodically writes a log to the Arvados SDK.
2 //
3 // This package is useful for maintaining a log object that is updated
4 // over time. Every time the object is updated, it will be written to
5 // the log. Writes will be throttled to no more frequent than
6 // WriteInterval.
7 //
8 // This package is safe for concurrent use as long as:
9 // The maps passed to a LogMutator are not accessed outside of the
10 // LogMutator
11 //
12 // Usage:
13 // arvLogger := logger.NewLogger(params)
14 // arvLogger.Update(func(properties map[string]interface{},
15 //      entry map[string]interface{}) {
16 //   // Modifiy properties and entry however you want
17 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
18 //   // properties can take any values you want to give it,
19 //   // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
20 // })
21 package logger
22
23 import (
24         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
25         "log"
26         "time"
27 )
28
29 type LoggerParams struct {
30         Client        arvadosclient.ArvadosClient // The client we use to write log entries
31         EventType     string                      // The event type to assign to the log entry.
32         WriteInterval time.Duration               // Wait at least this long between log writes
33 }
34
35 // A LogMutator is a function which modifies the log entry.
36 // It takes two maps as arguments, properties is the first and entry
37 // is the second
38 // properties is a shortcut for entry["properties"].(map[string]interface{})
39 // properties can take any values you want to give it.
40 // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
41 // properties and entry are only safe to access inside the LogMutator,
42 // they should not be stored anywhere, otherwise you'll risk
43 // concurrent access.
44 type LogMutator func(map[string]interface{}, map[string]interface{})
45
46 // A Logger is used to build up a log entry over time and write every
47 // version of it.
48 type Logger struct {
49         // The data we write
50         data       map[string]interface{} // The entire map that we give to the api
51         entry      map[string]interface{} // Convenience shortcut into data
52         properties map[string]interface{} // Convenience shortcut into data
53
54         params LoggerParams // Parameters we were given
55
56         // Variables to coordinate updating and writing.
57         modified    bool            // Has this data been modified since the last write?
58         workToDo    chan LogMutator // Work to do in the worker thread.
59         writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
60
61         writeHooks []LogMutator // Mutators we call before each write.
62 }
63
64 // Create a new logger based on the specified parameters.
65 func NewLogger(params LoggerParams) *Logger {
66         // sanity check parameters
67         if &params.Client == nil {
68                 log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
69         }
70         if params.EventType == "" {
71                 log.Fatal("Empty event type in LoggerParams passed in to NewLogger()")
72         }
73
74         l := &Logger{data: make(map[string]interface{}),
75                 params: params}
76         l.entry = make(map[string]interface{})
77         l.data["log"] = l.entry
78         l.properties = make(map[string]interface{})
79         l.entry["properties"] = l.properties
80
81         l.workToDo = make(chan LogMutator, 10)
82         l.writeTicker = time.NewTicker(params.WriteInterval)
83
84         // Start the worker goroutine.
85         go l.work()
86
87         return l
88 }
89
90 // Exported functions will be called from other goroutines, therefore
91 // all they are allowed to do is enqueue work to be done in the worker
92 // goroutine.
93
94 // Enqueues an update. This will happen in another goroutine after
95 // this method returns.
96 func (l *Logger) Update(mutator LogMutator) {
97         l.workToDo <- mutator
98 }
99
100 // Similar to Update(), but writes the log entry as soon as possible
101 // (ignoring MinimumWriteInterval) and blocks until the entry has been
102 // written. This is useful if you know that you're about to quit
103 // (e.g. if you discovered a fatal error, or you're finished), since
104 // go will not wait for timers (including the pending write timer) to
105 // go off before exiting.
106 func (l *Logger) FinalUpdate(mutator LogMutator) {
107         // Block on this channel until everything finishes
108         done := make(chan bool)
109
110         // TODO(misha): Consider not accepting any future updates somehow,
111         // since they won't get written if they come in after this.
112
113         // Stop the periodic write ticker. We'll perform the final write
114         // before returning from this function.
115         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
116                 l.writeTicker.Stop()
117         }
118
119         // Apply the final update
120         l.workToDo <- mutator
121
122         // Perform the write and signal that we can return.
123         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
124                 // TODO(misha): Add a boolean arg to write() to indicate that it's
125                 // final so that we can set the appropriate event type.
126                 l.write()
127                 done <- true
128         }
129
130         // Wait until we've performed the write.
131         <-done
132 }
133
134 // Adds a hook which will be called every time this logger writes an entry.
135 func (l *Logger) AddWriteHook(hook LogMutator) {
136         // We do the work in a LogMutator so that it happens in the worker
137         // goroutine.
138         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
139                 l.writeHooks = append(l.writeHooks, hook)
140         }
141 }
142
143 // The worker loop
144 func (l *Logger) work() {
145         for {
146                 select {
147                 case <-l.writeTicker.C:
148                         if l.modified {
149                                 l.write()
150                                 l.modified = false
151                         }
152                 case mutator := <-l.workToDo:
153                         mutator(l.properties, l.entry)
154                         l.modified = true
155                 }
156         }
157 }
158
159 // Actually writes the log entry.
160 func (l *Logger) write() {
161
162         // Run all our hooks
163         for _, hook := range l.writeHooks {
164                 hook(l.properties, l.entry)
165         }
166
167         // Update the event type in case it was modified or is missing.
168         // TODO(misha): Fix this to write different event types.
169         l.entry["event_type"] = l.params.EventType
170
171         // Write the log entry.
172         // This is a network write and will take a while, which is bad
173         // because we're blocking all the other work on this goroutine.
174         //
175         // TODO(misha): Consider rewriting this so that we can encode l.data
176         // into a string, and then perform the actual write in another
177         // routine. This will be tricky and will require support in the
178         // client.
179         err := l.params.Client.Create("logs", l.data, nil)
180         if err != nil {
181                 log.Printf("Attempted to log: %v", l.data)
182                 log.Fatalf("Received error writing log: %v", err)
183         }
184 }