9d1aa53621447b2f196be82c369c537ef0e2da23
[arvados.git] / lib / controller / localdb / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "net/http"
12         "os"
13         "strings"
14
15         "git.arvados.org/arvados.git/lib/controller/railsproxy"
16         "git.arvados.org/arvados.git/lib/controller/rpc"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/ctxlog"
19         "git.arvados.org/arvados.git/sdk/go/httpserver"
20         "github.com/fsnotify/fsnotify"
21         "github.com/sirupsen/logrus"
22 )
23
24 type railsProxy = rpc.Conn
25
26 type Conn struct {
27         cluster          *arvados.Cluster
28         *railsProxy      // handles API methods that aren't defined on Conn itself
29         vocabularyCache  *arvados.Vocabulary
30         reloadVocabulary bool
31         loginController
32 }
33
34 func NewConn(cluster *arvados.Cluster) *Conn {
35         railsProxy := railsproxy.NewConn(cluster)
36         railsProxy.RedactHostInErrors = true
37         conn := Conn{
38                 cluster:    cluster,
39                 railsProxy: railsProxy,
40         }
41         conn.loginController = chooseLoginController(cluster, &conn)
42         return &conn
43 }
44
45 func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
46         if properties == nil {
47                 return nil
48         }
49         var props map[string]interface{}
50         switch properties := properties.(type) {
51         case string:
52                 err := json.Unmarshal([]byte(properties), &props)
53                 if err != nil {
54                         return err
55                 }
56         case map[string]interface{}:
57                 props = properties
58         default:
59                 return fmt.Errorf("unexpected properties type %T", properties)
60         }
61         voc, err := conn.VocabularyGet(ctx)
62         if err != nil {
63                 return err
64         }
65         err = voc.Check(props)
66         if err != nil {
67                 return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
68         }
69         return nil
70 }
71
72 func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
73         watcher, err := fsnotify.NewWatcher()
74         if err != nil {
75                 logger.WithError(err).Error("vocabulary fsnotify setup failed")
76                 return
77         }
78         defer watcher.Close()
79
80         err = watcher.Add(vocPath)
81         if err != nil {
82                 logger.WithError(err).Error("vocabulary file watcher failed")
83                 return
84         }
85
86         for {
87                 select {
88                 case err, ok := <-watcher.Errors:
89                         if !ok {
90                                 return
91                         }
92                         logger.WithError(err).Warn("vocabulary file watcher error")
93                 case _, ok := <-watcher.Events:
94                         if !ok {
95                                 return
96                         }
97                         for len(watcher.Events) > 0 {
98                                 <-watcher.Events
99                         }
100                         fn()
101                 }
102         }
103 }
104
105 func (conn *Conn) loadVocabularyFile() error {
106         vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
107         if err != nil {
108                 return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
109         }
110         mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
111         for k := range conn.cluster.Collections.ManagedProperties {
112                 mk = append(mk, k)
113         }
114         voc, err := arvados.NewVocabulary(vf, mk)
115         if err != nil {
116                 return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
117         }
118         err = voc.Validate()
119         if err != nil {
120                 return fmt.Errorf("while validating vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
121         }
122         conn.vocabularyCache = voc
123         return nil
124 }
125
126 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
127 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
128         if conn.cluster.API.VocabularyPath == "" {
129                 return arvados.Vocabulary{
130                         Tags: map[string]arvados.VocabularyTag{},
131                 }, nil
132         }
133         logger := ctxlog.FromContext(ctx)
134         if conn.vocabularyCache == nil {
135                 // Initial load of vocabulary file.
136                 err := conn.loadVocabularyFile()
137                 if err != nil {
138                         logger.WithError(err).Error("error loading vocabulary file")
139                         return arvados.Vocabulary{
140                                 Tags: map[string]arvados.VocabularyTag{},
141                         }, err
142                 }
143                 go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
144                         logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
145                         conn.reloadVocabulary = true
146                 })
147         } else if conn.reloadVocabulary {
148                 // Requested reload of vocabulary file.
149                 conn.reloadVocabulary = false
150                 err := conn.loadVocabularyFile()
151                 if err != nil {
152                         logger.WithError(err).Error("error reloading vocabulary file - ignoring")
153                 } else {
154                         logger.Info("vocabulary file reloaded successfully")
155                 }
156         }
157         return *conn.vocabularyCache, nil
158 }
159
160 // Logout handles the logout of conn giving to the appropriate loginController
161 func (conn *Conn) Logout(ctx context.Context, opts arvados.LogoutOptions) (arvados.LogoutResponse, error) {
162         return conn.loginController.Logout(ctx, opts)
163 }
164
165 // Login handles the login of conn giving to the appropriate loginController
166 func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
167         return conn.loginController.Login(ctx, opts)
168 }
169
170 // UserAuthenticate handles the User Authentication of conn giving to the appropriate loginController
171 func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
172         return conn.loginController.UserAuthenticate(ctx, opts)
173 }
174
175 func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
176         // The requested UUID can be a user (virtual home project), which we just pass on to
177         // the API server.
178         if strings.Index(options.UUID, "-j7d0g-") != 5 {
179                 return conn.railsProxy.GroupContents(ctx, options)
180         }
181
182         var resp arvados.ObjectList
183
184         // Get the group object
185         respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
186         if err != nil {
187                 return resp, err
188         }
189
190         // If the group has groupClass 'filter', apply the filters before getting the contents.
191         if respGroup.GroupClass == "filter" {
192                 if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
193                         for _, f := range filters {
194                                 // f is supposed to be a []string
195                                 tmp, ok2 := f.([]interface{})
196                                 if !ok2 || len(tmp) < 3 {
197                                         return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
198                                 }
199                                 var filter arvados.Filter
200                                 if attr, ok2 := tmp[0].(string); ok2 {
201                                         filter.Attr = attr
202                                 } else {
203                                         return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
204                                 }
205                                 if operator, ok2 := tmp[1].(string); ok2 {
206                                         filter.Operator = operator
207                                 } else {
208                                         return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
209                                 }
210                                 filter.Operand = tmp[2]
211                                 options.Filters = append(options.Filters, filter)
212                         }
213                 } else {
214                         return resp, fmt.Errorf("filter unparsable: not an array\n")
215                 }
216                 // Use the generic /groups/contents endpoint for filter groups
217                 options.UUID = ""
218         }
219
220         return conn.railsProxy.GroupContents(ctx, options)
221 }
222
223 func httpErrorf(code int, format string, args ...interface{}) error {
224         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
225 }