APIHost: {}
Insecure: true
Listen: :{}
+LogLevel: {}
Postgres:
host: {}
dbname: {}
sslmode: require
""".format(os.environ['ARVADOS_API_HOST'],
port,
+ ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
_dbconfig('host'),
_dbconfig('database'),
_dbconfig('username'),
)
type Config struct {
- Client arvados.Client
- Postgres pgConfig
- Listen string
- Debug bool
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ LogLevel string
+ LogFormat string
PingTimeout arvados.Duration
ClientEventQueue int
"connect_timeout": "30",
"sslmode": "require",
},
+ LogLevel: "info",
+ LogFormat: "json",
PingTimeout: arvados.Duration(time.Minute),
ClientEventQueue: 64,
ServerEventQueue: 4,
import (
"database/sql"
- "log"
"sync"
"time"
&logRow.CreatedAt,
&propYAML)
if e.err != nil {
- log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
return nil
}
e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
if e.err != nil {
- log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
return nil
}
e.logRow = &logRow
import (
"encoding/json"
"io"
- "log"
- "net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
-type wsConn interface {
- io.ReadWriter
- Request() *http.Request
- SetReadDeadline(time.Time) error
- SetWriteDeadline(time.Time) error
-}
-
type handler struct {
Client arvados.Client
PingTimeout time.Duration
}
type handlerStats struct {
- QueueDelay time.Duration
- WriteDelay time.Duration
- EventBytes uint64
- EventCount uint64
+ QueueDelayNs time.Duration
+ WriteDelayNs time.Duration
+ EventBytes uint64
+ EventCount uint64
}
func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+ ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
+ "RemoteAddr": ws.Request().RemoteAddr,
+ }))
sess, err := h.NewSession(ws)
if err != nil {
- log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+ logger(ctx).WithError(err).Error("NewSession failed")
return
}
}
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
- sess.debugLogf("received frame: %q", buf[:n])
+ logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
if err == nil && n == len(buf) {
err = errFrameTooBig
}
if err != nil {
if err != io.EOF {
- sess.debugLogf("handler: read: %s", err)
+ logger(ctx).WithError(err).Info("read error")
}
stop <- err
return
msg := make(map[string]interface{})
err = json.Unmarshal(buf[:n], &msg)
if err != nil {
- sess.debugLogf("handler: unmarshal: %s", err)
+ logger(ctx).WithError(err).Info("invalid json from client")
stop <- err
return
}
for _, buf := range sess.Receive(msg, buf[:n]) {
- sess.debugLogf("handler: to queue: %s", string(buf))
+ logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
queue <- buf
}
}
for e := range queue {
if buf, ok := e.([]byte); ok {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
- sess.debugLogf("handler: send msg: %s", string(buf))
+ logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
_, err := ws.Write(buf)
if err != nil {
- sess.debugLogf("handler: write {}: %s", err)
+ logger(ctx).WithError(err).Error("write failed")
stop <- err
break
}
continue
}
e := e.(*event)
+ log := logger(ctx).WithField("serial", e.Serial)
buf, err := sess.EventMessage(e)
if err != nil {
- sess.debugLogf("EventMessage %d: err %s", err)
+ log.WithError(err).Error("EventMessage failed")
stop <- err
break
} else if len(buf) == 0 {
- sess.debugLogf("EventMessage %d: skip", e.Serial)
+ log.Debug("skip")
continue
}
- sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+ log.WithField("frame", string(buf)).Debug("send event")
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- sess.debugLogf("handler: write: %s", err)
+ log.WithError(err).Error("write failed")
stop <- err
break
}
- sess.debugLogf("handler: sent event %d", e.Serial)
- stats.WriteDelay += time.Since(t0)
- stats.QueueDelay += t0.Sub(e.Received)
+ log.Debug("sent")
+
+ if e != nil {
+ stats.QueueDelayNs += t0.Sub(e.Received)
+ }
+ stats.WriteDelayNs += time.Since(t0)
stats.EventBytes += uint64(len(buf))
stats.EventCount++
}
package main
import (
- "encoding/json"
- "fmt"
- "log"
- "time"
-)
-
-func init() {
- log.SetFlags(0)
-}
+ "context"
-func errorLogf(f string, args ...interface{}) {
- log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+ log "github.com/Sirupsen/logrus"
+)
-var debugLogf = func(f string, args ...interface{}) {
- log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+var loggerCtxKey = new(int)
-func mustMarshal(v interface{}) []byte {
- buf, err := json.Marshal(v)
- if err != nil {
- panic(err)
- }
- return buf
+func contextWithLogger(ctx context.Context, logger *log.Entry) context.Context {
+ return context.WithValue(ctx, loggerCtxKey, logger)
}
-func logj(args ...interface{}) {
- m := map[string]interface{}{"Time": time.Now().UTC()}
- for i := 0; i < len(args)-1; i += 2 {
- m[fmt.Sprintf("%s", args[i])] = args[i+1]
- }
- buf, err := json.Marshal(m)
- if err != nil {
- errorLogf("logj: %s", err)
- return
+func logger(ctx context.Context) *log.Entry {
+ if ctx != nil {
+ if logger, ok := ctx.Value(loggerCtxKey).(*log.Entry); ok {
+ return logger
+ }
}
- log.Print(string(buf))
+ return log.WithFields(nil)
}
import (
"flag"
"fmt"
- "log"
"net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/config"
+ log "github.com/Sirupsen/logrus"
)
func main() {
if err != nil {
log.Fatal(err)
}
- if !cfg.Debug {
- debugLogf = func(string, ...interface{}) {}
+
+ lvl, err := log.ParseLevel(cfg.LogLevel)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.SetLevel(lvl)
+ switch cfg.LogFormat {
+ case "text":
+ log.SetFormatter(&log.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: time.RFC3339Nano,
+ })
+ case "json":
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ default:
+ log.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
}
if *dumpConfig {
}
eventSource.NewSink().Stop()
- log.Printf("listening at %s", srv.Addr)
+ log.WithField("Listen", srv.Addr).Info("listening")
log.Fatal(srv.ListenAndServe())
}
}
func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ logger := logger(nil).
+ WithField("token", pc.Client.AuthToken).
+ WithField("uuid", uuid)
pc.tidy()
now := time.Now()
if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
- debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+ logger.WithField("allowed", perm.allowed).Debug("cache hit")
return perm.allowed, nil
}
var buf map[string]interface{}
var allowed bool
if err == nil {
allowed = true
- } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
allowed = false
} else {
- errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+ logger.WithError(err).Error("lookup error")
return false, err
}
- debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+ logger.WithField("allowed", allowed).Debug("cache miss")
pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
return allowed, nil
}
import (
"database/sql"
- "fmt"
- "log"
"strconv"
"strings"
"sync"
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- log.Fatalf("sql.Open: %s", err)
+ logger(nil).WithError(err).Fatal("sql.Open failed")
}
if err = db.Ping(); err != nil {
- log.Fatalf("db.Ping: %s", err)
+ logger(nil).WithError(err).Fatal("db.Ping failed")
}
ps.db = db
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
}
})
err = ps.pqListener.Listen("logs")
if err != nil {
- log.Fatal(err)
+ logger(nil).WithError(err).Fatal("pq Listen failed")
}
- debugLogf("pgEventSource listening")
+ logger(nil).Debug("pgEventSource listening")
go ps.run()
}
// concurrent queries would be bounded by
// client_count X client_queue_size.
e.Detail()
- debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+
ps.mtx.Lock()
for sink := range ps.sinks {
sink.channel <- e
select {
case err, ok := <-ps.shutdown:
if ok {
- debugLogf("shutdown on error: %s", err)
+ logger(nil).WithError(err).Info("shutdown")
}
close(eventQueue)
return
case <-ticker.C:
- debugLogf("pgEventSource listener ping")
+ logger(nil).Debug("listener ping")
ps.pqListener.Ping()
case pqEvent, ok := <-ps.pqListener.Notify:
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
if err != nil {
- log.Printf("bad notify payload: %+v", pqEvent)
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
continue
}
serial++
Serial: serial,
db: ps.db,
}
- debugLogf("event %d %+v", e.Serial, e)
+ logger(nil).WithField("event", e).Debug("incoming")
eventQueue <- e
go e.Detail()
}
import (
"database/sql"
- "encoding/json"
- "log"
+ "io"
"net/http"
+ "strconv"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
"golang.org/x/net/websocket"
)
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
type router struct {
Config *Config
eventSource eventSource
mux *http.ServeMux
setupOnce sync.Once
+
+ lastReqID int64
+ lastReqMtx sync.Mutex
}
func (rtr *router) setup() {
handler := &handler{
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
- NewSession: func(ws wsConn) (session, error) {
+ NewSession: func(ws wsConn) (session, error) {
return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
},
}
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
- logj("Type", "connect",
- "RemoteAddr", ws.Request().RemoteAddr)
t0 := time.Now()
-
sink := rtr.eventSource.NewSink()
+ logger(ws.Request().Context()).Info("connected")
+
stats := handler.Handle(ws, sink.Channel())
- logj("Type", "disconnect",
- "RemoteAddr", ws.Request().RemoteAddr,
- "Elapsed", time.Now().Sub(t0).Seconds(),
- "Stats", stats)
+ logger(ws.Request().Context()).WithFields(log.Fields{
+ "Elapsed": time.Now().Sub(t0).Seconds(),
+ "Stats": stats,
+ }).Info("disconnect")
sink.Stop()
ws.Close()
}
}
+func (rtr *router) newReqID() string {
+ rtr.lastReqMtx.Lock()
+ defer rtr.lastReqMtx.Unlock()
+ id := time.Now().UnixNano()
+ if id <= rtr.lastReqID {
+ id = rtr.lastReqID + 1
+ }
+ return strconv.FormatInt(id, 36)
+}
+
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- logj("Type", "request",
- "RemoteAddr", req.RemoteAddr,
- "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+ logger := logger(req.Context()).
+ WithField("RequestID", rtr.newReqID())
+ ctx := contextWithLogger(req.Context(), logger)
+ req = req.WithContext(ctx)
+ logger.WithFields(log.Fields{
+ "RemoteAddr": req.RemoteAddr,
+ "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+ }).Info("accept request")
rtr.mux.ServeHTTP(resp, req)
}
-
-func reqLog(m map[string]interface{}) {
- j, err := json.Marshal(m)
- if err != nil {
- log.Fatal(err)
- }
- log.Print(string(j))
-}
// incoming events will be queued. If the event queue fills
// up, the connection will be dropped.
EventMessage(*event) ([]byte, error)
-
- debugLogf(string, ...interface{})
}
"database/sql"
"encoding/json"
"errors"
- "log"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
var (
db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
+ log *log.Entry
mtx sync.Mutex
setupOnce sync.Once
}
ws: ws,
db: db,
permChecker: NewPermChecker(ac),
+ log: logger(ws.Request().Context()),
}
err := ws.Request().ParseForm()
if err != nil {
- log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+ sess.log.WithError(err).Error("ParseForm failed")
return nil, err
}
token := ws.Request().Form.Get("api_token")
sess.permChecker.SetToken(token)
- sess.debugLogf("token = %+q", token)
+ sess.log.WithField("token", token).Debug("set token")
return sess, nil
}
-func (sess *v0session) debugLogf(s string, args ...interface{}) {
- args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
- debugLogf("%s "+s, args...)
-}
-
func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
- sess.debugLogf("received message: %+v", msg)
+ sess.log.WithField("data", msg).Debug("received message")
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
- sess.debugLogf("ignored unrecognized request: %s", err)
+ sess.log.WithError(err).Info("ignored invalid request")
return nil
}
if sub.Method == "subscribe" {
- sub.prepare()
- sess.debugLogf("subscription: %v", sub)
+ sub.prepare(sess)
+ sess.log.WithField("sub", sub).Debug("sub prepared")
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
sess.mtx.Lock()
defer sess.mtx.Unlock()
for _, sub := range sess.subscriptions {
- if sub.match(e) {
+ if sub.match(sess, e) {
return true
}
}
if sub.LastLogID == 0 {
return
}
- debugLogf("getOldEvents(%d)", sub.LastLogID)
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
// Here we do a "select id" query and queue an event for every
// log since the given ID, then use (*event)Detail() to
// retrieve the whole row and decide whether to send it. This
sub.LastLogID,
time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
if err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
return
}
for rows.Next() {
var id uint64
err := rows.Scan(&id)
if err != nil {
- errorLogf("Scan: %s", err)
+ sess.log.WithError(err).Error("row Scan failed")
continue
}
e := &event{
Received: time.Now(),
db: sess.db,
}
- if !sub.match(e) {
- debugLogf("skip old event %+v", e)
+ if !sub.match(sess, e) {
+ sess.log.WithField("event", e).Debug("skip old event")
continue
}
msg, err := sess.EventMessage(e)
if err != nil {
- debugLogf("event marshal: %s", err)
+ sess.log.WithError(err).Error("event marshal failed")
continue
}
- debugLogf("old event: %s", string(msg))
+ sess.log.WithField("data", msg).Debug("will queue old event")
msgs = append(msgs, msg)
}
if err := rows.Err(); err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
}
return
}
type v0filter [3]interface{}
-func (sub *v0subscribe) match(e *event) bool {
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+ log := sess.log.WithField("LogID", e.LogID)
detail := e.Detail()
if detail == nil {
- debugLogf("match(%d): failed on no detail", e.LogID)
+ log.Error("match failed, no detail")
return false
}
+ log = log.WithField("funcs", len(sub.funcs))
for i, f := range sub.funcs {
if !f(e) {
- debugLogf("match(%d): failed on func %d", e.LogID, i)
+ log.WithField("func", i).Debug("match failed")
return false
}
}
- debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+ log.Debug("match passed")
return true
}
-func (sub *v0subscribe) prepare() {
+func (sub *v0subscribe) prepare(sess *v0session) {
for _, f := range sub.Filters {
if len(f) != 3 {
continue
}
}
sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
for _, s := range strs {
if s == e.Detail().EventType {
return true
}
t, err := time.Parse(time.RFC3339Nano, tstr)
if err != nil {
- debugLogf("time.Parse(%q): %s", tstr, err)
+ sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
continue
}
+ var fn func(*event) bool
switch op {
case ">=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.Before(t)
- })
+ }
case "<=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.After(t)
- })
+ }
case ">":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.After(t)
- })
+ }
case "<":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Before(t)
- })
+ }
case "=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Equal(t)
- })
+ }
+ default:
+ sess.log.WithField("operator", op).Info("bogus operator")
+ continue
}
+ sub.funcs = append(sub.funcs, fn)
}
}
}