"context"
"encoding/json"
"fmt"
+ "net"
"net/http"
"os"
- "strings"
+ "sync"
+ "time"
"git.arvados.org/arvados.git/lib/controller/railsproxy"
"git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/fsnotify/fsnotify"
+ "github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
)
type railsProxy = rpc.Conn
type Conn struct {
- cluster *arvados.Cluster
- *railsProxy // handles API methods that aren't defined on Conn itself
- vocabularyCache *arvados.Vocabulary
- reloadVocabulary bool
+ cluster *arvados.Cluster
+ *railsProxy // handles API methods that aren't defined on Conn itself
+ vocabularyCache *arvados.Vocabulary
+ vocabularyFileModTime time.Time
+ lastVocabularyRefreshCheck time.Time
+ lastVocabularyError error
loginController
+ gwTunnels map[string]*yamux.Session
+ gwTunnelsLock sync.Mutex
+ activeUsers map[string]bool
+ activeUsersLock sync.Mutex
+ activeUsersReset time.Time
}
func NewConn(cluster *arvados.Cluster) *Conn {
return nil
}
-func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- logger.WithError(err).Error("vocabulary fsnotify setup failed")
- return
+func (conn *Conn) maybeRefreshVocabularyCache(logger logrus.FieldLogger) error {
+ if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
+ // Throttle the access to disk to at most once per second.
+ return nil
}
- defer watcher.Close()
-
- err = watcher.Add(vocPath)
+ conn.lastVocabularyRefreshCheck = time.Now()
+ fi, err := os.Stat(conn.cluster.API.VocabularyPath)
if err != nil {
- logger.WithError(err).Error("vocabulary file watcher failed")
- return
+ err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+ conn.lastVocabularyError = err
+ return err
}
-
- for {
- select {
- case err, ok := <-watcher.Errors:
- if !ok {
- return
- }
- logger.WithError(err).Warn("vocabulary file watcher error")
- case _, ok := <-watcher.Events:
- if !ok {
- return
- }
- for len(watcher.Events) > 0 {
- <-watcher.Events
- }
- fn()
+ if fi.ModTime().After(conn.vocabularyFileModTime) {
+ err = conn.loadVocabularyFile()
+ if err != nil {
+ conn.lastVocabularyError = err
+ return err
}
+ conn.vocabularyFileModTime = fi.ModTime()
+ conn.lastVocabularyError = nil
+ logger.Info("vocabulary file reloaded successfully")
}
+ return nil
}
func (conn *Conn) loadVocabularyFile() error {
vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
if err != nil {
- return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+ return fmt.Errorf("while reading the vocabulary file: %v", err)
}
mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
for k := range conn.cluster.Collections.ManagedProperties {
if err != nil {
return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
}
- err = voc.Validate()
- if err != nil {
- return fmt.Errorf("while validating vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
- }
conn.vocabularyCache = voc
return nil
}
+// LastVocabularyError returns the last error encountered while loading the
+// vocabulary file.
+// Implements health.Func
+func (conn *Conn) LastVocabularyError() error {
+ conn.maybeRefreshVocabularyCache(ctxlog.FromContext(context.Background()))
+ return conn.lastVocabularyError
+}
+
// VocabularyGet refreshes the vocabulary cache if necessary and returns it.
func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
if conn.cluster.API.VocabularyPath == "" {
- return arvados.Vocabulary{}, nil
+ return arvados.Vocabulary{
+ Tags: map[string]arvados.VocabularyTag{},
+ }, nil
}
logger := ctxlog.FromContext(ctx)
if conn.vocabularyCache == nil {
logger.WithError(err).Error("error loading vocabulary file")
return arvados.Vocabulary{}, err
}
- go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
- logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
- conn.reloadVocabulary = true
- })
- } else if conn.reloadVocabulary {
- // Requested reload of vocabulary file.
- conn.reloadVocabulary = false
- err := conn.loadVocabularyFile()
- if err != nil {
- logger.WithError(err).Error("error reloading vocabulary file - ignoring")
- } else {
- logger.Info("vocabulary file reloaded successfully")
- }
+ }
+ err := conn.maybeRefreshVocabularyCache(logger)
+ if err != nil {
+ logger.WithError(err).Error("error reloading vocabulary file - ignoring")
}
return *conn.vocabularyCache, nil
}
return conn.loginController.UserAuthenticate(ctx, opts)
}
-func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
- // The requested UUID can be a user (virtual home project), which we just pass on to
- // the API server.
- if strings.Index(options.UUID, "-j7d0g-") != 5 {
- return conn.railsProxy.GroupContents(ctx, options)
- }
-
- var resp arvados.ObjectList
-
- // Get the group object
- respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
- if err != nil {
- return resp, err
- }
-
- // If the group has groupClass 'filter', apply the filters before getting the contents.
- if respGroup.GroupClass == "filter" {
- if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
- for _, f := range filters {
- // f is supposed to be a []string
- tmp, ok2 := f.([]interface{})
- if !ok2 || len(tmp) < 3 {
- return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
- }
- var filter arvados.Filter
- if attr, ok2 := tmp[0].(string); ok2 {
- filter.Attr = attr
- } else {
- return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
- }
- if operator, ok2 := tmp[1].(string); ok2 {
- filter.Operator = operator
- } else {
- return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
- }
- filter.Operand = tmp[2]
- options.Filters = append(options.Filters, filter)
- }
- } else {
- return resp, fmt.Errorf("filter unparsable: not an array\n")
+var privateNetworks = func() (nets []*net.IPNet) {
+ for _, s := range []string{
+ "127.0.0.0/8",
+ "10.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "169.254.0.0/16",
+ "::1/128",
+ "fe80::/10",
+ "fc00::/7",
+ } {
+ _, n, err := net.ParseCIDR(s)
+ if err != nil {
+ panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
}
- // Use the generic /groups/contents endpoint for filter groups
- options.UUID = ""
+ nets = append(nets, n)
}
-
- return conn.railsProxy.GroupContents(ctx, options)
-}
+ return
+}()
func httpErrorf(code int, format string, args ...interface{}) error {
return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)