Updated logger to do all work in a dedicated goroutine, so we don't need to worry...
authormishaz <misha@curoverse.com>
Tue, 10 Feb 2015 01:55:37 +0000 (01:55 +0000)
committerTom Clegg <tom@curoverse.com>
Fri, 13 Feb 2015 21:25:31 +0000 (16:25 -0500)
sdk/go/logger/logger.go
services/datamanager/collection/collection.go
services/datamanager/datamanager.go
services/datamanager/keep/keep.go
services/datamanager/loggerutil/loggerutil.go

index a53ab3cd8d2b4d1f792e75d2b211a4a75811d654..e6e1ed64df3a17988c236231147c961c9282c261 100644 (file)
@@ -2,8 +2,8 @@
 //
 // This package is useful for maintaining a log object that is updated
 // over time. Every time the object is updated, it will be written to
-// the log. Writes will be throttled to no more than one every
-// WriteFrequencySeconds
+// the log. Writes will be throttled to no more frequent than
+// WriteInterval.
 //
 // This package is safe for concurrent use as long as:
 // The maps passed to a LogMutator are not accessed outside of the
@@ -23,14 +23,13 @@ package logger
 import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
-       "sync"
        "time"
 )
 
 type LoggerParams struct {
-       Client               arvadosclient.ArvadosClient // The client we use to write log entries
-       EventType            string                      // The event type to assign to the log entry.
-       MinimumWriteInterval time.Duration               // Wait at least this long between log writes
+       Client        arvadosclient.ArvadosClient // The client we use to write log entries
+       EventType     string                      // The event type to assign to the log entry.
+       WriteInterval time.Duration               // Wait at least this long between log writes
 }
 
 // A LogMutator is a function which modifies the log entry.
@@ -52,13 +51,12 @@ type Logger struct {
        entry      map[string]interface{} // Convenience shortcut into data
        properties map[string]interface{} // Convenience shortcut into data
 
-       lock   sync.Locker  // Synchronizes access to this struct
        params LoggerParams // Parameters we were given
 
-       // Variables used to determine when and if we write to the log.
-       nextWriteAllowed time.Time // The next time we can write, respecting MinimumWriteInterval
-       modified         bool      // Has this data been modified since the last write?
-       writeScheduled   bool      // Is a write been scheduled for the future?
+       // Variables to coordinate updating and writing.
+       modified    bool            // Has this data been modified since the last write?
+       workToDo    chan LogMutator // Work to do in the worker thread.
+       writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
 
        writeHooks []LogMutator // Mutators we call before each write.
 }
@@ -74,91 +72,88 @@ func NewLogger(params LoggerParams) *Logger {
        }
 
        l := &Logger{data: make(map[string]interface{}),
-               lock:   &sync.Mutex{},
                params: params}
        l.entry = make(map[string]interface{})
        l.data["log"] = l.entry
        l.properties = make(map[string]interface{})
        l.entry["properties"] = l.properties
-       return l
-}
 
-// Updates the log data and then writes it to the api server. If the
-// log has been recently written then the write will be postponed to
-// respect MinimumWriteInterval and this function will return before
-// the write occurs.
-func (l *Logger) Update(mutator LogMutator) {
-       l.lock.Lock()
+       l.workToDo = make(chan LogMutator, 10)
+       l.writeTicker = time.NewTicker(params.WriteInterval)
 
-       mutator(l.properties, l.entry)
-       l.modified = true // We assume the mutator modified the log, even though we don't know for sure.
+       // Start the worker goroutine.
+       go l.work()
 
-       l.considerWriting()
-
-       l.lock.Unlock()
+       return l
 }
 
-// Similar to Update(), but forces a write without respecting the
-// MinimumWriteInterval. This is useful if you know that you're about
-// to quit (e.g. if you discovered a fatal error, or you're finished),
-// since go will not wait for timers (including the pending write
-// timer) to go off before exiting.
-func (l *Logger) ForceUpdate(mutator LogMutator) {
-       l.lock.Lock()
+// Exported functions will be called from other goroutines, therefore
+// all they are allowed to do is enqueue work to be done in the worker
+// goroutine.
 
-       mutator(l.properties, l.entry)
-       l.modified = true // We assume the mutator modified the log, even though we don't know for sure.
-
-       l.write()
-       l.lock.Unlock()
-}
-
-// Adds a hook which will be called every time this logger writes an entry.
-func (l *Logger) AddWriteHook(hook LogMutator) {
-       l.lock.Lock()
-       l.writeHooks = append(l.writeHooks, hook)
-       // TODO(misha): Consider setting modified and attempting a write.
-       l.lock.Unlock()
+// Enqueues an update. This will happen in another goroutine after
+// this method returns.
+func (l *Logger) Update(mutator LogMutator) {
+       l.workToDo <- mutator
 }
 
-// This function is called on a timer when we have something to write,
-// but need to schedule the write for the future to respect
-// MinimumWriteInterval.
-func (l *Logger) acquireLockConsiderWriting() {
-       l.lock.Lock()
+// Similar to Update(), but writes the log entry as soon as possible
+// (ignoring MinimumWriteInterval) and blocks until the entry has been
+// written. This is useful if you know that you're about to quit
+// (e.g. if you discovered a fatal error, or you're finished), since
+// go will not wait for timers (including the pending write timer) to
+// go off before exiting.
+func (l *Logger) FinalUpdate(mutator LogMutator) {
+       // Block on this channel until everything finishes
+       done := make(chan bool)
+
+       // TODO(misha): Consider not accepting any future updates somehow,
+       // since they won't get written if they come in after this.
+
+       // Stop the periodic write ticker. We'll perform the final write
+       // before returning from this function.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               l.writeTicker.Stop()
+       }
 
-       // We are the scheduled write, so there are no longer future writes
-       // scheduled.
-       l.writeScheduled = false
+       // Apply the final update
+       l.workToDo <- mutator
 
-       l.considerWriting()
+       // Perform the write and signal that we can return.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               // TODO(misha): Add a boolean arg to write() to indicate that it's
+               // final so that we can set the appropriate event type.
+               l.write()
+               done <- true
+       }
 
-       l.lock.Unlock()
+       // Wait until we've performed the write.
+       <-done
 }
 
-// The above methods each acquire the lock and release it.
-// =======================================================
-// The below methods all assume we're holding a lock.
-
-// Check whether we have anything to write. If we do, then either
-// write it now or later, based on what we're allowed.
-func (l *Logger) considerWriting() {
-       if !l.modified {
-               // Nothing to write
-       } else if l.writeAllowedNow() {
-               l.write()
-       } else if l.writeScheduled {
-               // A future write is already scheduled, we don't need to do anything.
-       } else {
-               writeAfter := l.nextWriteAllowed.Sub(time.Now())
-               time.AfterFunc(writeAfter, l.acquireLockConsiderWriting)
-               l.writeScheduled = true
+// Adds a hook which will be called every time this logger writes an entry.
+func (l *Logger) AddWriteHook(hook LogMutator) {
+       // We do the work in a LogMutator so that it happens in the worker
+       // goroutine.
+       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
+               l.writeHooks = append(l.writeHooks, hook)
        }
 }
 
-// Whether writing now would respect MinimumWriteInterval
-func (l *Logger) writeAllowedNow() bool {
-       return l.nextWriteAllowed.Before(time.Now())
+// The worker loop
+func (l *Logger) work() {
+       for {
+               select {
+               case <-l.writeTicker.C:
+                       if l.modified {
+                               l.write()
+                               l.modified = false
+                       }
+               case mutator := <-l.workToDo:
+                       mutator(l.properties, l.entry)
+                       l.modified = true
+               }
+       }
 }
 
 // Actually writes the log entry.
@@ -170,24 +165,20 @@ func (l *Logger) write() {
        }
 
        // Update the event type in case it was modified or is missing.
+       // TODO(misha): Fix this to write different event types.
        l.entry["event_type"] = l.params.EventType
 
        // Write the log entry.
        // This is a network write and will take a while, which is bad
-       // because we're holding a lock and all other goroutines will back
-       // up behind it.
+       // because we're blocking all the other work on this goroutine.
        //
        // TODO(misha): Consider rewriting this so that we can encode l.data
-       // into a string, release the lock, write the string, and then
-       // acquire the lock again to note that we succeeded in writing. This
-       // will be tricky and will require support in the client.
+       // into a string, and then perform the actual write in another
+       // routine. This will be tricky and will require support in the
+       // client.
        err := l.params.Client.Create("logs", l.data, nil)
        if err != nil {
                log.Printf("Attempted to log: %v", l.data)
                log.Fatalf("Received error writing log: %v", err)
        }
-
-       // Update stats.
-       l.nextWriteAllowed = time.Now().Add(l.params.MinimumWriteInterval)
-       l.modified = false
 }
index 424db835629ea8e8b604adebce5b30431a31b7ca..9a7a838f1b9db71eea07bf88cd98316f5e132fd3 100644 (file)
@@ -65,15 +65,6 @@ func init() {
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-//     return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-//     return len(s.Items), nil
-// }
-
 // Write the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
index 398c8770929c70a665ca99a94deb428c7e4a5fca..f63f4628a0cf9aec06078507ab495651a113a160 100644 (file)
@@ -48,14 +48,15 @@ func main() {
        var arvLogger *logger.Logger
        if logEventType != "" {
                arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
-                       EventType:            logEventType,
-                       MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)})
+                       EventType:     logEventType,
+                       WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        runInfo := make(map[string]interface{})
-                       runInfo["time_started"] = time.Now()
+                       runInfo["time_started"] = now
                        runInfo["args"] = os.Args
                        hostname, err := os.Hostname()
                        if err != nil {
@@ -90,7 +91,7 @@ func main() {
        // Log that we're finished. We force the recording, since go will
        // not wait for the timer before exiting.
        if arvLogger != nil {
-               arvLogger.ForceUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
                        p["run_info"].(map[string]interface{})["time_finished"] = time.Now()
                })
        }
index 20a59316b727699d3715367e3881af2a8a8394a3..dcd6c4948810395a1d03578b108c393897905144 100644 (file)
@@ -69,15 +69,6 @@ type KeepServiceList struct {
        KeepServers    []ServerAddress `json:"items"`
 }
 
-// Methods to implement util.SdkListResponse Interface
-func (k KeepServiceList) NumItemsAvailable() (numAvailable int, err error) {
-       return k.ItemsAvailable, nil
-}
-
-func (k KeepServiceList) NumItemsContained() (numContained int, err error) {
-       return len(k.KeepServers), nil
-}
-
 var (
        // Don't access the token directly, use getDataManagerToken() to
        // make sure it's been read.
@@ -244,10 +235,11 @@ func GetServerStatus(arvLogger *logger.Logger,
                keepServer.Port)
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := make(map[string]interface{})
-                       serverInfo["time_status_request_sent"] = time.Now()
+                       serverInfo["time_status_request_sent"] = now
 
                        keepInfo[keepServer.String()] = serverInfo
                })
@@ -274,10 +266,11 @@ func GetServerStatus(arvLogger *logger.Logger,
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-                       serverInfo["time_status_response_processed"] = time.Now()
+                       serverInfo["time_status_response_processed"] = now
                        serverInfo["status"] = keepStatus
                })
        }
@@ -289,10 +282,11 @@ func CreateIndexRequest(arvLogger *logger.Logger,
        log.Println("About to fetch keep server contents from " + url)
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-                       serverInfo["time_index_request_sent"] = time.Now()
+                       serverInfo["time_index_request_sent"] = now
                })
        }
 
@@ -319,11 +313,11 @@ func ReadServerResponse(arvLogger *logger.Logger,
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-
-                       serverInfo["time_index_response_received"] = time.Now()
+                       serverInfo["time_index_response_received"] = now
                })
        }
 
@@ -393,11 +387,12 @@ func ReadServerResponse(arvLogger *logger.Logger,
                        numSizeDisagreements)
 
                if arvLogger != nil {
+                       now := time.Now()
                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                                keepInfo := p["keep_info"].(map[string]interface{})
                                serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
 
-                               serverInfo["time_processing_finished"] = time.Now()
+                               serverInfo["time_processing_finished"] = now
                                serverInfo["lines_received"] = numLines
                                serverInfo["duplicates_seen"] = numDuplicates
                                serverInfo["size_disagreements_seen"] = numSizeDisagreements
index fa876d4598bd288c99383f44992b9b74d403d771..c19a7abba2708a2e0afd15f08e57abccfec2f650 100644 (file)
@@ -8,12 +8,9 @@ import (
        "time"
 )
 
-// Assumes you haven't already called arvLogger.Edit()!
-// If you have called arvLogger.Edit() this method will hang waiting
-// for the lock you're already holding.
 func FatalWithMessage(arvLogger *logger.Logger, message string) {
        if arvLogger != nil {
-               arvLogger.ForceUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
                        p["FATAL"] = message
                        p["run_info"].(map[string]interface{})["time_finished"] = time.Now()
                })