8005: Document installing Git on CentOS 6 from RepoForge.
[arvados.git] / sdk / go / logger / logger.go
1 // Logger periodically writes a log to the Arvados SDK.
2 //
3 // This package is useful for maintaining a log object that is updated
4 // over time. This log object will be periodically written to the log,
5 // as specified by WriteInterval in the Params.
6 //
7 // This package is safe for concurrent use as long as:
8 // The maps passed to a LogMutator are not accessed outside of the
9 // LogMutator
10 //
11 // Usage:
12 // arvLogger := logger.NewLogger(params)
13 // arvLogger.Update(func(properties map[string]interface{},
14 //      entry map[string]interface{}) {
15 //   // Modifiy properties and entry however you want
16 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
17 //   // properties can take any (valid) values you want to give it,
18 //   // entry will only take the fields listed at
19 //   // http://doc.arvados.org/api/schema/Log.html
20 //   // Valid values for properties are anything that can be json
21 //   // encoded (i.e. will not error if you call json.Marshal() on it.
22 // })
23 package logger
24
25 import (
26         "fmt"
27         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
28         "log"
29         "time"
30 )
31
32 const (
33         startSuffix              = "-start"
34         partialSuffix            = "-partial"
35         finalSuffix              = "-final"
36         numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
37 )
38
39 type LoggerParams struct {
40         Client          arvadosclient.ArvadosClient // The client we use to write log entries
41         EventTypePrefix string                      // The prefix we use for the event type in the log entry
42         WriteInterval   time.Duration               // Wait at least this long between log writes
43 }
44
45 // A LogMutator is a function which modifies the log entry.
46 // It takes two maps as arguments, properties is the first and entry
47 // is the second
48 // properties is a shortcut for entry["properties"].(map[string]interface{})
49 // properties can take any values you want to give it.
50 // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
51 // properties and entry are only safe to access inside the LogMutator,
52 // they should not be stored anywhere, otherwise you'll risk
53 // concurrent access.
54 type LogMutator func(map[string]interface{}, map[string]interface{})
55
56 // A Logger is used to build up a log entry over time and write every
57 // version of it.
58 type Logger struct {
59         // The data we write
60         data       map[string]interface{} // The entire map that we give to the api
61         entry      map[string]interface{} // Convenience shortcut into data
62         properties map[string]interface{} // Convenience shortcut into data
63
64         params LoggerParams // Parameters we were given
65
66         // Variables to coordinate updating and writing.
67         modified    bool            // Has this data been modified since the last write?
68         workToDo    chan LogMutator // Work to do in the worker thread.
69         writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
70         hasWritten  bool            // Whether we've written at all yet.
71         noMoreWork  chan bool       // Signals that we're done writing.
72
73         writeHooks []LogMutator // Mutators we call before each write.
74 }
75
76 // Create a new logger based on the specified parameters.
77 func NewLogger(params LoggerParams) (l *Logger, err error) {
78         // sanity check parameters
79         if &params.Client == nil {
80                 err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
81                 return
82         }
83         if params.EventTypePrefix == "" {
84                 err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
85                 return
86         }
87
88         l = &Logger{
89                 data:        make(map[string]interface{}),
90                 entry:       make(map[string]interface{}),
91                 properties:  make(map[string]interface{}),
92                 params:      params,
93                 workToDo:    make(chan LogMutator, 10),
94                 writeTicker: time.NewTicker(params.WriteInterval),
95                 noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
96
97         l.data["log"] = l.entry
98         l.entry["properties"] = l.properties
99
100         // Start the worker goroutine.
101         go l.work()
102
103         return l, nil
104 }
105
106 // Exported functions will be called from other goroutines, therefore
107 // all they are allowed to do is enqueue work to be done in the worker
108 // goroutine.
109
110 // Enqueues an update. This will happen in another goroutine after
111 // this method returns.
112 func (l *Logger) Update(mutator LogMutator) {
113         l.workToDo <- mutator
114 }
115
116 // Similar to Update(), but writes the log entry as soon as possible
117 // (ignoring MinimumWriteInterval) and blocks until the entry has been
118 // written. This is useful if you know that you're about to quit
119 // (e.g. if you discovered a fatal error, or you're finished), since
120 // go will not wait for timers (including the pending write timer) to
121 // go off before exiting.
122 func (l *Logger) FinalUpdate(mutator LogMutator) {
123         // TODO(misha): Consider not accepting any future updates somehow,
124         // since they won't get written if they come in after this.
125
126         // Stop the periodic write ticker. We'll perform the final write
127         // before returning from this function.
128         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
129                 l.writeTicker.Stop()
130         }
131
132         // Apply the final update
133         l.workToDo <- mutator
134
135         // Perform the final write and signal that we can return.
136         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
137                 l.write(true)
138                 for i := 0; i < numberNoMoreWorkMessages; {
139                         l.noMoreWork <- true
140                 }
141         }
142
143         // Wait until we've performed the write.
144         <-l.noMoreWork
145 }
146
147 // Adds a hook which will be called every time this logger writes an entry.
148 func (l *Logger) AddWriteHook(hook LogMutator) {
149         // We do the work in a LogMutator so that it happens in the worker
150         // goroutine.
151         l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
152                 l.writeHooks = append(l.writeHooks, hook)
153         }
154 }
155
156 // The worker loop
157 func (l *Logger) work() {
158         for {
159                 select {
160                 case <-l.writeTicker.C:
161                         if l.modified {
162                                 l.write(false)
163                                 l.modified = false
164                         }
165                 case mutator := <-l.workToDo:
166                         mutator(l.properties, l.entry)
167                         l.modified = true
168                 case <-l.noMoreWork:
169                         return
170                 }
171         }
172 }
173
174 // Actually writes the log entry.
175 func (l *Logger) write(isFinal bool) {
176
177         // Run all our hooks
178         for _, hook := range l.writeHooks {
179                 hook(l.properties, l.entry)
180         }
181
182         // Update the event type.
183         if isFinal {
184                 l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
185         } else if l.hasWritten {
186                 l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
187         } else {
188                 l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
189         }
190         l.hasWritten = true
191
192         // Write the log entry.
193         // This is a network write and will take a while, which is bad
194         // because we're blocking all the other work on this goroutine.
195         //
196         // TODO(misha): Consider rewriting this so that we can encode l.data
197         // into a string, and then perform the actual write in another
198         // routine. This will be tricky and will require support in the
199         // client.
200         err := l.params.Client.Create("logs", l.data, nil)
201         if err != nil {
202                 log.Printf("Received error writing %v: %v", l.data, err)
203         }
204 }