func (h *Handler) setup() {
mux := http.NewServeMux()
- mux.Handle("/_health/", &health.Handler{
- Token: h.Cluster.ManagementToken,
- Prefix: "/_health/",
- Routes: health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }},
- })
+ var vocHealthFunc health.Func
oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
- rtr := router.New(federation.New(h.Cluster), router.Config{
+ rtr := router.New(federation.New(h.Cluster, &vocHealthFunc), router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
})
+
+ mux.Handle("/_health/", &health.Handler{
+ Token: h.Cluster.ManagementToken,
+ Prefix: "/_health/",
+ Routes: health.Routes{
+ "ping": func() error { _, err := h.db(context.TODO()); return err },
+ "vocabulary": vocHealthFunc,
+ },
+ })
mux.Handle("/arvados/v1/config", rtr)
mux.Handle("/arvados/v1/vocabulary", rtr)
mux.Handle("/"+arvados.EndpointUserAuthenticate.Path, rtr) // must come before .../users/
"net/http"
"os"
"strings"
+ "time"
"git.arvados.org/arvados.git/lib/controller/railsproxy"
"git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/fsnotify/fsnotify"
- "github.com/sirupsen/logrus"
)
type railsProxy = rpc.Conn
type Conn struct {
- cluster *arvados.Cluster
- *railsProxy // handles API methods that aren't defined on Conn itself
- vocabularyCache *arvados.Vocabulary
- reloadVocabulary bool
+ cluster *arvados.Cluster
+ *railsProxy // handles API methods that aren't defined on Conn itself
+ vocabularyCache *arvados.Vocabulary
+ vocabularyFileModTime time.Time
+ lastVocabularyRefreshCheck time.Time
+ lastVocabularyError error
loginController
}
return nil
}
-func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- logger.WithError(err).Error("vocabulary fsnotify setup failed")
- return
+func (conn *Conn) maybeRefreshVocabularyCache() error {
+ if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
+ // Throttle the access to disk to at most once per second.
+ return nil
}
- defer watcher.Close()
-
- err = watcher.Add(vocPath)
+ conn.lastVocabularyRefreshCheck = time.Now()
+ fi, err := os.Stat(conn.cluster.API.VocabularyPath)
if err != nil {
- logger.WithError(err).Error("vocabulary file watcher failed")
- return
+ err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+ conn.lastVocabularyError = err
+ return err
}
-
- for {
- select {
- case err, ok := <-watcher.Errors:
- if !ok {
- return
- }
- logger.WithError(err).Warn("vocabulary file watcher error")
- case _, ok := <-watcher.Events:
- if !ok {
- return
- }
- for len(watcher.Events) > 0 {
- <-watcher.Events
- }
- fn()
+ if fi.ModTime().After(conn.vocabularyFileModTime) {
+ err = conn.loadVocabularyFile()
+ if err != nil {
+ conn.lastVocabularyError = err
+ return err
}
+ conn.vocabularyFileModTime = fi.ModTime()
+ conn.lastVocabularyError = nil
}
+ return nil
}
func (conn *Conn) loadVocabularyFile() error {
vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
if err != nil {
- return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+ return fmt.Errorf("couldn't reading the vocabulary file: %v", err)
}
mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
for k := range conn.cluster.Collections.ManagedProperties {
return nil
}
+// LastVocabularyError returns the last error encountered while loading the
+// vocabulary file.
+func (conn *Conn) LastVocabularyError() error {
+ conn.maybeRefreshVocabularyCache()
+ return conn.lastVocabularyError
+}
+
// VocabularyGet refreshes the vocabulary cache if necessary and returns it.
func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
if conn.cluster.API.VocabularyPath == "" {
err := conn.loadVocabularyFile()
if err != nil {
logger.WithError(err).Error("error loading vocabulary file")
- return arvados.Vocabulary{
- Tags: map[string]arvados.VocabularyTag{},
- }, err
- }
- go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
- logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
- conn.reloadVocabulary = true
- })
- } else if conn.reloadVocabulary {
- // Requested reload of vocabulary file.
- conn.reloadVocabulary = false
- err := conn.loadVocabularyFile()
- if err != nil {
- logger.WithError(err).Error("error reloading vocabulary file - ignoring")
- } else {
- logger.Info("vocabulary file reloaded successfully")
+ return arvados.Vocabulary{}, err
}
}
+ err := conn.maybeRefreshVocabularyCache()
+ if err != nil {
+ logger.WithError(err).Error("error reloading vocabulary file - ignoring")
+ } else {
+ logger.Info("vocabulary file reloaded successfully")
+ }
return *conn.vocabularyCache, nil
}