Merge branch 'github-pr-223'
[arvados.git] / lib / controller / localdb / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "net"
12         "net/http"
13         "os"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/controller/railsproxy"
18         "git.arvados.org/arvados.git/lib/controller/rpc"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "git.arvados.org/arvados.git/sdk/go/httpserver"
22         "github.com/hashicorp/yamux"
23         "github.com/jmoiron/sqlx"
24         "github.com/sirupsen/logrus"
25 )
26
27 type railsProxy = rpc.Conn
28
29 type Conn struct {
30         cluster                    *arvados.Cluster
31         *railsProxy                // handles API methods that aren't defined on Conn itself
32         getdb                      func(context.Context) (*sqlx.DB, error)
33         vocabularyCache            *arvados.Vocabulary
34         vocabularyFileModTime      time.Time
35         lastVocabularyRefreshCheck time.Time
36         lastVocabularyError        error
37         loginController
38         gwTunnels        map[string]*yamux.Session
39         gwTunnelsLock    sync.Mutex
40         activeUsers      map[string]bool
41         activeUsersLock  sync.Mutex
42         activeUsersReset time.Time
43
44         wantContainerPriorityUpdate chan struct{}
45 }
46
47 func NewConn(bgCtx context.Context, cluster *arvados.Cluster, getdb func(context.Context) (*sqlx.DB, error)) *Conn {
48         railsProxy := railsproxy.NewConn(cluster)
49         railsProxy.RedactHostInErrors = true
50         conn := Conn{
51                 cluster:                     cluster,
52                 railsProxy:                  railsProxy,
53                 getdb:                       getdb,
54                 wantContainerPriorityUpdate: make(chan struct{}, 1),
55         }
56         conn.loginController = chooseLoginController(cluster, &conn)
57         go conn.runContainerPriorityUpdateThread(bgCtx)
58         return &conn
59 }
60
61 func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
62         if properties == nil {
63                 return nil
64         }
65         var props map[string]interface{}
66         switch properties := properties.(type) {
67         case string:
68                 err := json.Unmarshal([]byte(properties), &props)
69                 if err != nil {
70                         return err
71                 }
72         case map[string]interface{}:
73                 props = properties
74         default:
75                 return fmt.Errorf("unexpected properties type %T", properties)
76         }
77         voc, err := conn.VocabularyGet(ctx)
78         if err != nil {
79                 return err
80         }
81         err = voc.Check(props)
82         if err != nil {
83                 return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
84         }
85         return nil
86 }
87
88 func (conn *Conn) maybeRefreshVocabularyCache(logger logrus.FieldLogger) error {
89         if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
90                 // Throttle the access to disk to at most once per second.
91                 return nil
92         }
93         conn.lastVocabularyRefreshCheck = time.Now()
94         fi, err := os.Stat(conn.cluster.API.VocabularyPath)
95         if err != nil {
96                 err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
97                 conn.lastVocabularyError = err
98                 return err
99         }
100         if fi.ModTime().After(conn.vocabularyFileModTime) {
101                 err = conn.loadVocabularyFile()
102                 if err != nil {
103                         conn.lastVocabularyError = err
104                         return err
105                 }
106                 conn.vocabularyFileModTime = fi.ModTime()
107                 conn.lastVocabularyError = nil
108                 logger.Info("vocabulary file reloaded successfully")
109         }
110         return nil
111 }
112
113 func (conn *Conn) loadVocabularyFile() error {
114         vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
115         if err != nil {
116                 return fmt.Errorf("while reading the vocabulary file: %v", err)
117         }
118         mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
119         for k := range conn.cluster.Collections.ManagedProperties {
120                 mk = append(mk, k)
121         }
122         voc, err := arvados.NewVocabulary(vf, mk)
123         if err != nil {
124                 return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
125         }
126         conn.vocabularyCache = voc
127         return nil
128 }
129
130 // LastVocabularyError returns the last error encountered while loading the
131 // vocabulary file.
132 // Implements health.Func
133 func (conn *Conn) LastVocabularyError() error {
134         conn.maybeRefreshVocabularyCache(ctxlog.FromContext(context.Background()))
135         return conn.lastVocabularyError
136 }
137
138 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
139 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
140         if conn.cluster.API.VocabularyPath == "" {
141                 return arvados.Vocabulary{
142                         Tags: map[string]arvados.VocabularyTag{},
143                 }, nil
144         }
145         logger := ctxlog.FromContext(ctx)
146         if conn.vocabularyCache == nil {
147                 // Initial load of vocabulary file.
148                 err := conn.loadVocabularyFile()
149                 if err != nil {
150                         logger.WithError(err).Error("error loading vocabulary file")
151                         return arvados.Vocabulary{}, err
152                 }
153         }
154         err := conn.maybeRefreshVocabularyCache(logger)
155         if err != nil {
156                 logger.WithError(err).Error("error reloading vocabulary file - ignoring")
157         }
158         return *conn.vocabularyCache, nil
159 }
160
161 // Logout handles the logout of conn giving to the appropriate loginController
162 func (conn *Conn) Logout(ctx context.Context, opts arvados.LogoutOptions) (arvados.LogoutResponse, error) {
163         return conn.loginController.Logout(ctx, opts)
164 }
165
166 // Login handles the login of conn giving to the appropriate loginController
167 func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
168         return conn.loginController.Login(ctx, opts)
169 }
170
171 // UserAuthenticate handles the User Authentication of conn giving to the appropriate loginController
172 func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
173         return conn.loginController.UserAuthenticate(ctx, opts)
174 }
175
176 var privateNetworks = func() (nets []*net.IPNet) {
177         for _, s := range []string{
178                 "127.0.0.0/8",
179                 "10.0.0.0/8",
180                 "172.16.0.0/12",
181                 "192.168.0.0/16",
182                 "169.254.0.0/16",
183                 "::1/128",
184                 "fe80::/10",
185                 "fc00::/7",
186         } {
187                 _, n, err := net.ParseCIDR(s)
188                 if err != nil {
189                         panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
190                 }
191                 nets = append(nets, n)
192         }
193         return
194 }()
195
196 func httpErrorf(code int, format string, args ...interface{}) error {
197         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
198 }