Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / controller / localdb / conn.go
index 0fae35e7d36a0d2515e6dd97f70e3080aa0948a0..5b6964de00d105ec89938e3c2f4e556688fd4722 100644 (file)
@@ -8,25 +8,36 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "net"
+       "net/http"
        "os"
-       "strings"
+       "sync"
+       "time"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
        "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/fsnotify/fsnotify"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
        "github.com/sirupsen/logrus"
 )
 
 type railsProxy = rpc.Conn
 
 type Conn struct {
-       cluster          *arvados.Cluster
-       *railsProxy      // handles API methods that aren't defined on Conn itself
-       vocabularyCache  *arvados.Vocabulary
-       reloadVocabulary bool
+       cluster                    *arvados.Cluster
+       *railsProxy                // handles API methods that aren't defined on Conn itself
+       vocabularyCache            *arvados.Vocabulary
+       vocabularyFileModTime      time.Time
+       lastVocabularyRefreshCheck time.Time
+       lastVocabularyError        error
        loginController
+       gwTunnels        map[string]*yamux.Session
+       gwTunnelsLock    sync.Mutex
+       activeUsers      map[string]bool
+       activeUsersLock  sync.Mutex
+       activeUsersReset time.Time
 }
 
 func NewConn(cluster *arvados.Cluster) *Conn {
@@ -60,46 +71,42 @@ func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) e
        if err != nil {
                return err
        }
-       return voc.Check(props)
-}
-
-func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
-       watcher, err := fsnotify.NewWatcher()
+       err = voc.Check(props)
        if err != nil {
-               logger.WithError(err).Error("vocabulary fsnotify setup failed")
-               return
+               return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
        }
-       defer watcher.Close()
+       return nil
+}
 
-       err = watcher.Add(vocPath)
+func (conn *Conn) maybeRefreshVocabularyCache(logger logrus.FieldLogger) error {
+       if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
+               // Throttle the access to disk to at most once per second.
+               return nil
+       }
+       conn.lastVocabularyRefreshCheck = time.Now()
+       fi, err := os.Stat(conn.cluster.API.VocabularyPath)
        if err != nil {
-               logger.WithError(err).Error("vocabulary file watcher failed")
-               return
+               err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+               conn.lastVocabularyError = err
+               return err
        }
-
-       for {
-               select {
-               case err, ok := <-watcher.Errors:
-                       if !ok {
-                               return
-                       }
-                       logger.WithError(err).Warn("vocabulary file watcher error")
-               case _, ok := <-watcher.Events:
-                       if !ok {
-                               return
-                       }
-                       for len(watcher.Events) > 0 {
-                               <-watcher.Events
-                       }
-                       fn()
+       if fi.ModTime().After(conn.vocabularyFileModTime) {
+               err = conn.loadVocabularyFile()
+               if err != nil {
+                       conn.lastVocabularyError = err
+                       return err
                }
+               conn.vocabularyFileModTime = fi.ModTime()
+               conn.lastVocabularyError = nil
+               logger.Info("vocabulary file reloaded successfully")
        }
+       return nil
 }
 
 func (conn *Conn) loadVocabularyFile() error {
        vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
        if err != nil {
-               return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+               return fmt.Errorf("while reading the vocabulary file: %v", err)
        }
        mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
        for k := range conn.cluster.Collections.ManagedProperties {
@@ -109,18 +116,24 @@ func (conn *Conn) loadVocabularyFile() error {
        if err != nil {
                return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
        }
-       err = voc.Validate()
-       if err != nil {
-               return fmt.Errorf("while validating vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
-       }
        conn.vocabularyCache = voc
        return nil
 }
 
+// LastVocabularyError returns the last error encountered while loading the
+// vocabulary file.
+// Implements health.Func
+func (conn *Conn) LastVocabularyError() error {
+       conn.maybeRefreshVocabularyCache(ctxlog.FromContext(context.Background()))
+       return conn.lastVocabularyError
+}
+
 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
        if conn.cluster.API.VocabularyPath == "" {
-               return arvados.Vocabulary{}, nil
+               return arvados.Vocabulary{
+                       Tags: map[string]arvados.VocabularyTag{},
+               }, nil
        }
        logger := ctxlog.FromContext(ctx)
        if conn.vocabularyCache == nil {
@@ -130,19 +143,10 @@ func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error)
                        logger.WithError(err).Error("error loading vocabulary file")
                        return arvados.Vocabulary{}, err
                }
-               go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
-                       logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
-                       conn.reloadVocabulary = true
-               })
-       } else if conn.reloadVocabulary {
-               // Requested reload of vocabulary file.
-               conn.reloadVocabulary = false
-               err := conn.loadVocabularyFile()
-               if err != nil {
-                       logger.WithError(err).Error("error reloading vocabulary file - ignoring")
-               } else {
-                       logger.Info("vocabulary file reloaded successfully")
-               }
+       }
+       err := conn.maybeRefreshVocabularyCache(logger)
+       if err != nil {
+               logger.WithError(err).Error("error reloading vocabulary file - ignoring")
        }
        return *conn.vocabularyCache, nil
 }
@@ -162,50 +166,26 @@ func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthent
        return conn.loginController.UserAuthenticate(ctx, opts)
 }
 
-func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
-       // The requested UUID can be a user (virtual home project), which we just pass on to
-       // the API server.
-       if strings.Index(options.UUID, "-j7d0g-") != 5 {
-               return conn.railsProxy.GroupContents(ctx, options)
-       }
-
-       var resp arvados.ObjectList
-
-       // Get the group object
-       respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
-       if err != nil {
-               return resp, err
-       }
-
-       // If the group has groupClass 'filter', apply the filters before getting the contents.
-       if respGroup.GroupClass == "filter" {
-               if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
-                       for _, f := range filters {
-                               // f is supposed to be a []string
-                               tmp, ok2 := f.([]interface{})
-                               if !ok2 || len(tmp) < 3 {
-                                       return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
-                               }
-                               var filter arvados.Filter
-                               if attr, ok2 := tmp[0].(string); ok2 {
-                                       filter.Attr = attr
-                               } else {
-                                       return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
-                               }
-                               if operator, ok2 := tmp[1].(string); ok2 {
-                                       filter.Operator = operator
-                               } else {
-                                       return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
-                               }
-                               filter.Operand = tmp[2]
-                               options.Filters = append(options.Filters, filter)
-                       }
-               } else {
-                       return resp, fmt.Errorf("filter unparsable: not an array\n")
+var privateNetworks = func() (nets []*net.IPNet) {
+       for _, s := range []string{
+               "127.0.0.0/8",
+               "10.0.0.0/8",
+               "172.16.0.0/12",
+               "192.168.0.0/16",
+               "169.254.0.0/16",
+               "::1/128",
+               "fe80::/10",
+               "fc00::/7",
+       } {
+               _, n, err := net.ParseCIDR(s)
+               if err != nil {
+                       panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
                }
-               // Use the generic /groups/contents endpoint for filter groups
-               options.UUID = ""
+               nets = append(nets, n)
        }
+       return
+}()
 
-       return conn.railsProxy.GroupContents(ctx, options)
+func httpErrorf(code int, format string, args ...interface{}) error {
+       return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
 }