17944: Vocabulary loading, monitoring and checking on several object types.
[arvados.git] / lib / controller / localdb / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "os"
12         "strings"
13
14         "git.arvados.org/arvados.git/lib/controller/railsproxy"
15         "git.arvados.org/arvados.git/lib/controller/rpc"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/ctxlog"
18         "github.com/fsnotify/fsnotify"
19         "github.com/sirupsen/logrus"
20 )
21
22 type railsProxy = rpc.Conn
23
24 type Conn struct {
25         cluster          *arvados.Cluster
26         *railsProxy      // handles API methods that aren't defined on Conn itself
27         vocabularyCache  *arvados.Vocabulary
28         reloadVocabulary bool
29         loginController
30 }
31
32 func NewConn(cluster *arvados.Cluster) *Conn {
33         railsProxy := railsproxy.NewConn(cluster)
34         railsProxy.RedactHostInErrors = true
35         conn := Conn{
36                 cluster:    cluster,
37                 railsProxy: railsProxy,
38         }
39         conn.loginController = chooseLoginController(cluster, &conn)
40         return &conn
41 }
42
43 func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
44         if properties == nil {
45                 return nil
46         }
47         var props map[string]interface{}
48         switch properties := properties.(type) {
49         case string:
50                 err := json.Unmarshal([]byte(properties), &props)
51                 if err != nil {
52                         return err
53                 }
54         case map[string]interface{}:
55                 props = properties
56         default:
57                 return fmt.Errorf("unexpected properties type %T", properties)
58         }
59         voc, err := conn.VocabularyGet(ctx)
60         if err != nil {
61                 return err
62         }
63         return voc.Check(props)
64 }
65
66 func watchVocabulary(logger logrus.FieldLogger, vocPath string, fn func()) {
67         watcher, err := fsnotify.NewWatcher()
68         if err != nil {
69                 logger.WithError(err).Error("vocabulary fsnotify setup failed")
70                 return
71         }
72         defer watcher.Close()
73
74         err = watcher.Add(vocPath)
75         if err != nil {
76                 logger.WithError(err).Error("vocabulary file watcher failed")
77                 return
78         }
79
80         for {
81                 select {
82                 case err, ok := <-watcher.Errors:
83                         if !ok {
84                                 return
85                         }
86                         logger.WithError(err).Warn("vocabulary file watcher error")
87                 case _, ok := <-watcher.Events:
88                         if !ok {
89                                 return
90                         }
91                         for len(watcher.Events) > 0 {
92                                 <-watcher.Events
93                         }
94                         fn()
95                 }
96         }
97 }
98
99 func (conn *Conn) loadVocabularyFile() error {
100         vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
101         if err != nil {
102                 return fmt.Errorf("couldn't read vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
103         }
104         mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
105         for k := range conn.cluster.Collections.ManagedProperties {
106                 mk = append(mk, k)
107         }
108         voc, err := arvados.NewVocabulary(vf, mk)
109         if err != nil {
110                 return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
111         }
112         err = voc.Validate()
113         if err != nil {
114                 return fmt.Errorf("while validating vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
115         }
116         conn.vocabularyCache = voc
117         return nil
118 }
119
120 // VocabularyGet refreshes the vocabulary cache if necessary and returns it.
121 func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
122         if conn.cluster.API.VocabularyPath == "" {
123                 return arvados.Vocabulary{}, nil
124         }
125         logger := ctxlog.FromContext(ctx)
126         if conn.vocabularyCache == nil {
127                 // Initial load of vocabulary file.
128                 err := conn.loadVocabularyFile()
129                 if err != nil {
130                         logger.WithError(err).Error("error loading vocabulary file")
131                         return arvados.Vocabulary{}, err
132                 }
133                 go watchVocabulary(logger, conn.cluster.API.VocabularyPath, func() {
134                         logger.Info("vocabulary file changed, it'll be reloaded next time it's needed")
135                         conn.reloadVocabulary = true
136                 })
137         } else if conn.reloadVocabulary {
138                 // Requested reload of vocabulary file.
139                 conn.reloadVocabulary = false
140                 err := conn.loadVocabularyFile()
141                 if err != nil {
142                         logger.WithError(err).Error("error reloading vocabulary file - ignoring")
143                 } else {
144                         logger.Info("vocabulary file reloaded successfully")
145                 }
146         }
147         return *conn.vocabularyCache, nil
148 }
149
150 // Logout handles the logout of conn giving to the appropriate loginController
151 func (conn *Conn) Logout(ctx context.Context, opts arvados.LogoutOptions) (arvados.LogoutResponse, error) {
152         return conn.loginController.Logout(ctx, opts)
153 }
154
155 // Login handles the login of conn giving to the appropriate loginController
156 func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
157         return conn.loginController.Login(ctx, opts)
158 }
159
160 // UserAuthenticate handles the User Authentication of conn giving to the appropriate loginController
161 func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
162         return conn.loginController.UserAuthenticate(ctx, opts)
163 }
164
165 func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
166         // The requested UUID can be a user (virtual home project), which we just pass on to
167         // the API server.
168         if strings.Index(options.UUID, "-j7d0g-") != 5 {
169                 return conn.railsProxy.GroupContents(ctx, options)
170         }
171
172         var resp arvados.ObjectList
173
174         // Get the group object
175         respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
176         if err != nil {
177                 return resp, err
178         }
179
180         // If the group has groupClass 'filter', apply the filters before getting the contents.
181         if respGroup.GroupClass == "filter" {
182                 if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
183                         for _, f := range filters {
184                                 // f is supposed to be a []string
185                                 tmp, ok2 := f.([]interface{})
186                                 if !ok2 || len(tmp) < 3 {
187                                         return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
188                                 }
189                                 var filter arvados.Filter
190                                 if attr, ok2 := tmp[0].(string); ok2 {
191                                         filter.Attr = attr
192                                 } else {
193                                         return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
194                                 }
195                                 if operator, ok2 := tmp[1].(string); ok2 {
196                                         filter.Operator = operator
197                                 } else {
198                                         return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
199                                 }
200                                 filter.Operand = tmp[2]
201                                 options.Filters = append(options.Filters, filter)
202                         }
203                 } else {
204                         return resp, fmt.Errorf("filter unparsable: not an array\n")
205                 }
206                 // Use the generic /groups/contents endpoint for filter groups
207                 options.UUID = ""
208         }
209
210         return conn.railsProxy.GroupContents(ctx, options)
211 }