1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/controller/railsproxy"
18 "git.arvados.org/arvados.git/lib/controller/rpc"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "git.arvados.org/arvados.git/sdk/go/httpserver"
22 "github.com/hashicorp/yamux"
23 "github.com/jmoiron/sqlx"
24 "github.com/sirupsen/logrus"
27 type railsProxy = rpc.Conn
30 cluster *arvados.Cluster
31 *railsProxy // handles API methods that aren't defined on Conn itself
32 getdb func(context.Context) (*sqlx.DB, error)
33 vocabularyCache *arvados.Vocabulary
34 vocabularyFileModTime time.Time
35 lastVocabularyRefreshCheck time.Time
36 lastVocabularyError error
38 gwTunnels map[string]*yamux.Session
39 gwTunnelsLock sync.Mutex
40 activeUsers map[string]bool
41 activeUsersLock sync.Mutex
42 activeUsersReset time.Time
44 wantContainerPriorityUpdate chan struct{}
47 func NewConn(bgCtx context.Context, cluster *arvados.Cluster, getdb func(context.Context) (*sqlx.DB, error)) *Conn {
48 railsProxy := railsproxy.NewConn(cluster)
49 railsProxy.RedactHostInErrors = true
52 railsProxy: railsProxy,
54 wantContainerPriorityUpdate: make(chan struct{}, 1),
56 conn.loginController = chooseLoginController(cluster, &conn)
57 go conn.runContainerPriorityUpdateThread(bgCtx)
61 func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
62 if properties == nil {
65 var props map[string]interface{}
66 switch properties := properties.(type) {
68 err := json.Unmarshal([]byte(properties), &props)
72 case map[string]interface{}:
75 return fmt.Errorf("unexpected properties type %T", properties)
77 voc, err := conn.VocabularyGet(ctx)
81 err = voc.Check(props)
83 return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
88 func (conn *Conn) maybeRefreshVocabularyCache(logger logrus.FieldLogger) error {
89 if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
90 // Throttle the access to disk to at most once per second.
93 conn.lastVocabularyRefreshCheck = time.Now()
94 fi, err := os.Stat(conn.cluster.API.VocabularyPath)
96 err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
97 conn.lastVocabularyError = err
100 if fi.ModTime().After(conn.vocabularyFileModTime) {
101 err = conn.loadVocabularyFile()
103 conn.lastVocabularyError = err
106 conn.vocabularyFileModTime = fi.ModTime()
107 conn.lastVocabularyError = nil
108 logger.Info("vocabulary file reloaded successfully")
113 func (conn *Conn) loadVocabularyFile() error {
114 vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
116 return fmt.Errorf("while reading the vocabulary file: %v", err)
118 mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
119 for k := range conn.cluster.Collections.ManagedProperties {
122 voc, err := arvados.NewVocabulary(vf, mk)
124 return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
126 conn.vocabularyCache = voc
130 // LastVocabularyError returns the last error encountered while loading the
132 // Implements health.Func
133 func (conn *Conn) LastVocabularyError() error {
134 conn.maybeRefreshVocabularyCache(ctxlog.FromContext(context.Background()))
135 return conn.lastVocabularyError
138 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
139 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
140 if conn.cluster.API.VocabularyPath == "" {
141 return arvados.Vocabulary{
142 Tags: map[string]arvados.VocabularyTag{},
145 logger := ctxlog.FromContext(ctx)
146 if conn.vocabularyCache == nil {
147 // Initial load of vocabulary file.
148 err := conn.loadVocabularyFile()
150 logger.WithError(err).Error("error loading vocabulary file")
151 return arvados.Vocabulary{}, err
154 err := conn.maybeRefreshVocabularyCache(logger)
156 logger.WithError(err).Error("error reloading vocabulary file - ignoring")
158 return *conn.vocabularyCache, nil
161 // Logout handles the logout of conn giving to the appropriate loginController
162 func (conn *Conn) Logout(ctx context.Context, opts arvados.LogoutOptions) (arvados.LogoutResponse, error) {
163 return conn.loginController.Logout(ctx, opts)
166 // Login handles the login of conn giving to the appropriate loginController
167 func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
168 return conn.loginController.Login(ctx, opts)
171 // UserAuthenticate handles the User Authentication of conn giving to the appropriate loginController
172 func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
173 return conn.loginController.UserAuthenticate(ctx, opts)
176 var privateNetworks = func() (nets []*net.IPNet) {
177 for _, s := range []string{
187 _, n, err := net.ParseCIDR(s)
189 panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
191 nets = append(nets, n)
196 func httpErrorf(code int, format string, args ...interface{}) error {
197 return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)