Merge branch 'master' into 3408-production-datamanager
[arvados.git] / sdk / go / logger / logger.go
1 // Logger periodically writes a log to the Arvados SDK.
2 //
3 // This package is useful for maintaining a log object that is updated
4 // over time. This log object will be periodically written to the log,
5 // as specified by WriteInterval in the Params.
6 //
7 // This package is safe for concurrent use as long as:
8 // The maps passed to a LogMutator are not accessed outside of the
9 // LogMutator
10 //
11 // Usage:
12 // arvLogger := logger.NewLogger(params)
13 // arvLogger.Update(func(properties map[string]interface{},
14 //      entry map[string]interface{}) {
15 //   // Modifiy properties and entry however you want
16 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
17 //   // properties can take any values you want to give it,
18 //   // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
19 // })
20 package logger
21
22 import (
23         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
24         "log"
25         "time"
26 )
27
28 const (
29         startSuffix   = "-start"
30         partialSuffix = "-partial"
31         finalSuffix   = "-final"
32 )
33
34 type LoggerParams struct {
35         Client          arvadosclient.ArvadosClient // The client we use to write log entries
36         EventTypePrefix string                      // The prefix we use for the event type in the log entry
37         WriteInterval   time.Duration               // Wait at least this long between log writes
38 }
39
40 // A LogMutator is a function which modifies the log entry.
41 // It takes two maps as arguments, properties is the first and entry
42 // is the second
43 // properties is a shortcut for entry["properties"].(map[string]interface{})
44 // properties can take any values you want to give it.
45 // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
46 // properties and entry are only safe to access inside the LogMutator,
47 // they should not be stored anywhere, otherwise you'll risk
48 // concurrent access.
49 type LogMutator func(map[string]interface{}, map[string]interface{})
50
51 // A Logger is used to build up a log entry over time and write every
52 // version of it.
53 type Logger struct {
54         // The data we write
55         data       map[string]interface{} // The entire map that we give to the api
56         entry      map[string]interface{} // Convenience shortcut into data
57         properties map[string]interface{} // Convenience shortcut into data
58
59         params LoggerParams // Parameters we were given
60
61         // Variables to coordinate updating and writing.
62         modified    bool            // Has this data been modified since the last write?
63         workToDo    chan LogMutator // Work to do in the worker thread.
64         writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
65         hasWritten  bool            // Whether we've written at all yet.
66
67         writeHooks []LogMutator // Mutators we call before each write.
68 }
69
70 // Create a new logger based on the specified parameters.
71 func NewLogger(params LoggerParams) *Logger {
72         // sanity check parameters
73         if &params.Client == nil {
74                 log.Fatal("Nil arvados client in LoggerParams passed in to NewLogger()")
75         }
76         if params.EventTypePrefix == "" {
77                 log.Fatal("Empty event type prefix in LoggerParams passed in to NewLogger()")
78         }
79
80         l := &Logger{data: make(map[string]interface{}),
81                 params: params}
82         l.entry = make(map[string]interface{})
83         l.data["log"] = l.entry
84         l.properties = make(map[string]interface{})
85         l.entry["properties"] = l.properties
86
87         l.workToDo = make(chan LogMutator, 10)
88         l.writeTicker = time.NewTicker(params.WriteInterval)
89
90         // Start the worker goroutine.
91         go l.work()
92
93         return l
94 }
95
96 // Exported functions will be called from other goroutines, therefore
97 // all they are allowed to do is enqueue work to be done in the worker
98 // goroutine.
99
100 // Enqueues an update. This will happen in another goroutine after
101 // this method returns.
102 func (l *Logger) Update(mutator LogMutator) {
103         l.workToDo <- mutator
104 }
105
106 // Similar to Update(), but writes the log entry as soon as possible
107 // (ignoring MinimumWriteInterval) and blocks until the entry has been
108 // written. This is useful if you know that you're about to quit
109 // (e.g. if you discovered a fatal error, or you're finished), since
110 // go will not wait for timers (including the pending write timer) to
111 // go off before exiting.
112 func (l *Logger) FinalUpdate(mutator LogMutator) {
113         // Block on this channel until everything finishes
114         done := make(chan bool)
115
116         // TODO(misha): Consider not accepting any future updates somehow,
117         // since they won't get written if they come in after this.
118
119         // Stop the periodic write ticker. We'll perform the final write
120         // before returning from this function.
121         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
122                 l.writeTicker.Stop()
123         }
124
125         // Apply the final update
126         l.workToDo <- mutator
127
128         // Perform the final write and signal that we can return.
129         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
130                 l.write(true)
131                 done <- true
132         }
133
134         // Wait until we've performed the write.
135         <-done
136 }
137
138 // Adds a hook which will be called every time this logger writes an entry.
139 func (l *Logger) AddWriteHook(hook LogMutator) {
140         // We do the work in a LogMutator so that it happens in the worker
141         // goroutine.
142         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
143                 l.writeHooks = append(l.writeHooks, hook)
144         }
145 }
146
147 // The worker loop
148 func (l *Logger) work() {
149         for {
150                 select {
151                 case <-l.writeTicker.C:
152                         if l.modified {
153                                 l.write(false)
154                                 l.modified = false
155                         }
156                 case mutator := <-l.workToDo:
157                         mutator(l.properties, l.entry)
158                         l.modified = true
159                 }
160         }
161 }
162
163 // Actually writes the log entry.
164 func (l *Logger) write(isFinal bool) {
165
166         // Run all our hooks
167         for _, hook := range l.writeHooks {
168                 hook(l.properties, l.entry)
169         }
170
171         // Update the event type.
172         if isFinal {
173                 l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
174         } else if l.hasWritten {
175                 l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
176         } else {
177                 l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
178         }
179         l.hasWritten = true
180
181         // Write the log entry.
182         // This is a network write and will take a while, which is bad
183         // because we're blocking all the other work on this goroutine.
184         //
185         // TODO(misha): Consider rewriting this so that we can encode l.data
186         // into a string, and then perform the actual write in another
187         // routine. This will be tricky and will require support in the
188         // client.
189         err := l.params.Client.Create("logs", l.data, nil)
190         if err != nil {
191                 log.Printf("Attempted to log: %v", l.data)
192                 log.Fatalf("Received error writing log: %v", err)
193         }
194 }