Merge branch '19692-quieter-runtime-status' refs #19692
[arvados.git] / lib / controller / localdb / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "net"
12         "net/http"
13         "os"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/controller/railsproxy"
18         "git.arvados.org/arvados.git/lib/controller/rpc"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "git.arvados.org/arvados.git/sdk/go/httpserver"
22         "github.com/hashicorp/yamux"
23         "github.com/sirupsen/logrus"
24 )
25
26 type railsProxy = rpc.Conn
27
28 type Conn struct {
29         cluster                    *arvados.Cluster
30         *railsProxy                // handles API methods that aren't defined on Conn itself
31         vocabularyCache            *arvados.Vocabulary
32         vocabularyFileModTime      time.Time
33         lastVocabularyRefreshCheck time.Time
34         lastVocabularyError        error
35         loginController
36         gwTunnels        map[string]*yamux.Session
37         gwTunnelsLock    sync.Mutex
38         activeUsers      map[string]bool
39         activeUsersLock  sync.Mutex
40         activeUsersReset time.Time
41 }
42
43 func NewConn(cluster *arvados.Cluster) *Conn {
44         railsProxy := railsproxy.NewConn(cluster)
45         railsProxy.RedactHostInErrors = true
46         conn := Conn{
47                 cluster:    cluster,
48                 railsProxy: railsProxy,
49         }
50         conn.loginController = chooseLoginController(cluster, &conn)
51         return &conn
52 }
53
54 func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
55         if properties == nil {
56                 return nil
57         }
58         var props map[string]interface{}
59         switch properties := properties.(type) {
60         case string:
61                 err := json.Unmarshal([]byte(properties), &props)
62                 if err != nil {
63                         return err
64                 }
65         case map[string]interface{}:
66                 props = properties
67         default:
68                 return fmt.Errorf("unexpected properties type %T", properties)
69         }
70         voc, err := conn.VocabularyGet(ctx)
71         if err != nil {
72                 return err
73         }
74         err = voc.Check(props)
75         if err != nil {
76                 return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
77         }
78         return nil
79 }
80
81 func (conn *Conn) maybeRefreshVocabularyCache(logger logrus.FieldLogger) error {
82         if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
83                 // Throttle the access to disk to at most once per second.
84                 return nil
85         }
86         conn.lastVocabularyRefreshCheck = time.Now()
87         fi, err := os.Stat(conn.cluster.API.VocabularyPath)
88         if err != nil {
89                 err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
90                 conn.lastVocabularyError = err
91                 return err
92         }
93         if fi.ModTime().After(conn.vocabularyFileModTime) {
94                 err = conn.loadVocabularyFile()
95                 if err != nil {
96                         conn.lastVocabularyError = err
97                         return err
98                 }
99                 conn.vocabularyFileModTime = fi.ModTime()
100                 conn.lastVocabularyError = nil
101                 logger.Info("vocabulary file reloaded successfully")
102         }
103         return nil
104 }
105
106 func (conn *Conn) loadVocabularyFile() error {
107         vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
108         if err != nil {
109                 return fmt.Errorf("while reading the vocabulary file: %v", err)
110         }
111         mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
112         for k := range conn.cluster.Collections.ManagedProperties {
113                 mk = append(mk, k)
114         }
115         voc, err := arvados.NewVocabulary(vf, mk)
116         if err != nil {
117                 return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
118         }
119         conn.vocabularyCache = voc
120         return nil
121 }
122
123 // LastVocabularyError returns the last error encountered while loading the
124 // vocabulary file.
125 // Implements health.Func
126 func (conn *Conn) LastVocabularyError() error {
127         conn.maybeRefreshVocabularyCache(ctxlog.FromContext(context.Background()))
128         return conn.lastVocabularyError
129 }
130
131 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
132 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
133         if conn.cluster.API.VocabularyPath == "" {
134                 return arvados.Vocabulary{
135                         Tags: map[string]arvados.VocabularyTag{},
136                 }, nil
137         }
138         logger := ctxlog.FromContext(ctx)
139         if conn.vocabularyCache == nil {
140                 // Initial load of vocabulary file.
141                 err := conn.loadVocabularyFile()
142                 if err != nil {
143                         logger.WithError(err).Error("error loading vocabulary file")
144                         return arvados.Vocabulary{}, err
145                 }
146         }
147         err := conn.maybeRefreshVocabularyCache(logger)
148         if err != nil {
149                 logger.WithError(err).Error("error reloading vocabulary file - ignoring")
150         }
151         return *conn.vocabularyCache, nil
152 }
153
154 // Logout handles the logout of conn giving to the appropriate loginController
155 func (conn *Conn) Logout(ctx context.Context, opts arvados.LogoutOptions) (arvados.LogoutResponse, error) {
156         return conn.loginController.Logout(ctx, opts)
157 }
158
159 // Login handles the login of conn giving to the appropriate loginController
160 func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
161         return conn.loginController.Login(ctx, opts)
162 }
163
164 // UserAuthenticate handles the User Authentication of conn giving to the appropriate loginController
165 func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
166         return conn.loginController.UserAuthenticate(ctx, opts)
167 }
168
169 var privateNetworks = func() (nets []*net.IPNet) {
170         for _, s := range []string{
171                 "127.0.0.0/8",
172                 "10.0.0.0/8",
173                 "172.16.0.0/12",
174                 "192.168.0.0/16",
175                 "169.254.0.0/16",
176                 "::1/128",
177                 "fe80::/10",
178                 "fc00::/7",
179         } {
180                 _, n, err := net.ParseCIDR(s)
181                 if err != nil {
182                         panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
183                 }
184                 nets = append(nets, n)
185         }
186         return
187 }()
188
189 func httpErrorf(code int, format string, args ...interface{}) error {
190         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
191 }