17944: Adds /_health/vocabulary health endpoint. Improves cache refreshing.
authorLucas Di Pentima <lucas.dipentima@curii.com>
Fri, 5 Nov 2021 19:07:37 +0000 (16:07 -0300)
committerLucas Di Pentima <lucas.dipentima@curii.com>
Fri, 5 Nov 2021 19:07:37 +0000 (16:07 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

.gitignore
lib/controller/federation/conn.go
lib/controller/handler.go
lib/controller/localdb/conn.go

index beb84b3c2034f23e7c3072ac510f4a43722a0c75..231424accd37d1549e1edf3d066aa93a135dfa31 100644 (file)
@@ -32,5 +32,6 @@ services/api/config/arvados-clients.yml
 .Rproj.user
 _version.py
 *.bak
 .Rproj.user
 _version.py
 *.bak
+*.log
 arvados-snakeoil-ca.pem
 .vagrant
 arvados-snakeoil-ca.pem
 .vagrant
index d477303527c7b71e827607a24af1ba838ee23464..7efbda8d127d523927f66a30ef6c5b4923811368 100644 (file)
@@ -22,6 +22,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
 )
 
 type Conn struct {
 )
 
 type Conn struct {
@@ -30,7 +31,7 @@ type Conn struct {
        remotes map[string]backend
 }
 
        remotes map[string]backend
 }
 
-func New(cluster *arvados.Cluster) *Conn {
+func New(cluster *arvados.Cluster, vocHealthFunc *health.Func) *Conn {
        local := localdb.NewConn(cluster)
        remotes := map[string]backend{}
        for id, remote := range cluster.RemoteClusters {
        local := localdb.NewConn(cluster)
        remotes := map[string]backend{}
        for id, remote := range cluster.RemoteClusters {
@@ -44,6 +45,8 @@ func New(cluster *arvados.Cluster) *Conn {
                remotes[id] = conn
        }
 
                remotes[id] = conn
        }
 
+       *vocHealthFunc = local.LastVocabularyError
+
        return &Conn{
                cluster: cluster,
                local:   local,
        return &Conn{
                cluster: cluster,
                local:   local,
index 358b0ed0c2f4214e1e09012436372c1f37861391..22d2e8329a056cff1bce3a497798f60f9d4bc88f 100644 (file)
@@ -100,17 +100,22 @@ func neverRedirect(*http.Request, []*http.Request) error { return http.ErrUseLas
 
 func (h *Handler) setup() {
        mux := http.NewServeMux()
 
 func (h *Handler) setup() {
        mux := http.NewServeMux()
-       mux.Handle("/_health/", &health.Handler{
-               Token:  h.Cluster.ManagementToken,
-               Prefix: "/_health/",
-               Routes: health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }},
-       })
+       var vocHealthFunc health.Func
 
        oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
 
        oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
-       rtr := router.New(federation.New(h.Cluster), router.Config{
+       rtr := router.New(federation.New(h.Cluster, &vocHealthFunc), router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls:      api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
        })
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls:      api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
        })
+
+       mux.Handle("/_health/", &health.Handler{
+               Token:  h.Cluster.ManagementToken,
+               Prefix: "/_health/",
+               Routes: health.Routes{
+                       "ping":       func() error { _, err := h.db(context.TODO()); return err },
+                       "vocabulary": vocHealthFunc,
+               },
+       })
        mux.Handle("/arvados/v1/config", rtr)
        mux.Handle("/arvados/v1/vocabulary", rtr)
        mux.Handle("/"+arvados.EndpointUserAuthenticate.Path, rtr) // must come before .../users/
        mux.Handle("/arvados/v1/config", rtr)
        mux.Handle("/arvados/v1/vocabulary", rtr)
        mux.Handle("/"+arvados.EndpointUserAuthenticate.Path, rtr) // must come before .../users/
index 9d1aa53621447b2f196be82c369c537ef0e2da23..f515673154b60da2d1522371de0f98c6fb8c0e7d 100644 (file)
@@ -11,23 +11,24 @@ import (
        "net/http"
        "os"
        "strings"
        "net/http"
        "os"
        "strings"
+       "time"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
        "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
        "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
-       "github.com/fsnotify/fsnotify"
-       "github.com/sirupsen/logrus"
 )
 
 type railsProxy = rpc.Conn
 
 type Conn struct {
 )
 
 type railsProxy = rpc.Conn
 
 type Conn struct {
-       cluster          *arvados.Cluster
-       *railsProxy      // handles API methods that aren't defined on Conn itself
-       vocabularyCache  *arvados.Vocabulary
-       reloadVocabulary bool
+       cluster                    *arvados.Cluster
+       *railsProxy                // handles API methods that aren't defined on Conn itself
+       vocabularyCache            *arvados.Vocabulary
+       vocabularyFileModTime      time.Time
+       lastVocabularyRefreshCheck time.Time
+       lastVocabularyError        error
        loginController
 }
 
        loginController
 }
 
@@ -69,43 +70,34 @@ func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) e
        return nil
 }
 
        return nil
 }
 
-func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
-       watcher, err := fsnotify.NewWatcher()
-       if err != nil {
-               logger.WithError(err).Error("vocabulary fsnotify setup failed")
-               return
+func (conn *Conn) maybeRefreshVocabularyCache() error {
+       if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
+               // Throttle the access to disk to at most once per second.
+               return nil
        }
        }
-       defer watcher.Close()
-
-       err = watcher.Add(vocPath)
+       conn.lastVocabularyRefreshCheck = time.Now()
+       fi, err := os.Stat(conn.cluster.API.VocabularyPath)
        if err != nil {
        if err != nil {
-               logger.WithError(err).Error("vocabulary file watcher failed")
-               return
+               err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+               conn.lastVocabularyError = err
+               return err
        }
        }
-
-       for {
-               select {
-               case err, ok := <-watcher.Errors:
-                       if !ok {
-                               return
-                       }
-                       logger.WithError(err).Warn("vocabulary file watcher error")
-               case _, ok := <-watcher.Events:
-                       if !ok {
-                               return
-                       }
-                       for len(watcher.Events) > 0 {
-                               <-watcher.Events
-                       }
-                       fn()
+       if fi.ModTime().After(conn.vocabularyFileModTime) {
+               err = conn.loadVocabularyFile()
+               if err != nil {
+                       conn.lastVocabularyError = err
+                       return err
                }
                }
+               conn.vocabularyFileModTime = fi.ModTime()
+               conn.lastVocabularyError = nil
        }
        }
+       return nil
 }
 
 func (conn *Conn) loadVocabularyFile() error {
        vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
        if err != nil {
 }
 
 func (conn *Conn) loadVocabularyFile() error {
        vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
        if err != nil {
-               return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+               return fmt.Errorf("couldn't reading the vocabulary file: %v", err)
        }
        mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
        for k := range conn.cluster.Collections.ManagedProperties {
        }
        mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
        for k := range conn.cluster.Collections.ManagedProperties {
@@ -123,6 +115,13 @@ func (conn *Conn) loadVocabularyFile() error {
        return nil
 }
 
        return nil
 }
 
+// LastVocabularyError returns the last error encountered while loading the
+// vocabulary file.
+func (conn *Conn) LastVocabularyError() error {
+       conn.maybeRefreshVocabularyCache()
+       return conn.lastVocabularyError
+}
+
 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
        if conn.cluster.API.VocabularyPath == "" {
 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
        if conn.cluster.API.VocabularyPath == "" {
@@ -136,24 +135,15 @@ func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error)
                err := conn.loadVocabularyFile()
                if err != nil {
                        logger.WithError(err).Error("error loading vocabulary file")
                err := conn.loadVocabularyFile()
                if err != nil {
                        logger.WithError(err).Error("error loading vocabulary file")
-                       return arvados.Vocabulary{
-                               Tags: map[string]arvados.VocabularyTag{},
-                       }, err
-               }
-               go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
-                       logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
-                       conn.reloadVocabulary = true
-               })
-       } else if conn.reloadVocabulary {
-               // Requested reload of vocabulary file.
-               conn.reloadVocabulary = false
-               err := conn.loadVocabularyFile()
-               if err != nil {
-                       logger.WithError(err).Error("error reloading vocabulary file - ignoring")
-               } else {
-                       logger.Info("vocabulary file reloaded successfully")
+                       return arvados.Vocabulary{}, err
                }
        }
                }
        }
+       err := conn.maybeRefreshVocabularyCache()
+       if err != nil {
+               logger.WithError(err).Error("error reloading vocabulary file - ignoring")
+       } else {
+               logger.Info("vocabulary file reloaded successfully")
+       }
        return *conn.vocabularyCache, nil
 }
 
        return *conn.vocabularyCache, nil
 }