- for pqEvent := range listener.Notify {
- if pqEvent.Channel != "logs" {
- continue
- }
- serial++
- e := &event{
- LogUUID: pqEvent.Extra,
- Received: time.Now(),
- Serial: serial,
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ debugLogf("shutdown on error: %s", err)
+ }
+ close(eventQueue)
+ return
+
+ case <-ticker.C:
+ debugLogf("pgEventSource listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(eventQueue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ log.Printf("bad notify payload: %+v", pqEvent)
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ debugLogf("event %d %+v", e.Serial, e)
+ eventQueue <- e
+ go e.Detail()