17170: Merge branch 'master'
[arvados.git] / lib / controller / rpc / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package rpc
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net"
18         "net/http"
19         "net/url"
20         "strconv"
21         "strings"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27 )
28
29 type TokenProvider func(context.Context) ([]string, error)
30
31 func PassthroughTokenProvider(ctx context.Context) ([]string, error) {
32         incoming, ok := auth.FromContext(ctx)
33         if !ok {
34                 return nil, errors.New("no token provided")
35         }
36         return incoming.Tokens, nil
37 }
38
39 type Conn struct {
40         SendHeader    http.Header
41         clusterID     string
42         httpClient    http.Client
43         baseURL       url.URL
44         tokenProvider TokenProvider
45 }
46
47 func NewConn(clusterID string, url *url.URL, insecure bool, tp TokenProvider) *Conn {
48         transport := http.DefaultTransport
49         if insecure {
50                 // It's not safe to copy *http.DefaultTransport
51                 // because it has a mutex (which might be locked)
52                 // protecting a private map (which might not be nil).
53                 // So we build our own, using the Go 1.12 default
54                 // values, ignoring any changes the application has
55                 // made to http.DefaultTransport.
56                 transport = &http.Transport{
57                         DialContext: (&net.Dialer{
58                                 Timeout:   30 * time.Second,
59                                 KeepAlive: 30 * time.Second,
60                                 DualStack: true,
61                         }).DialContext,
62                         MaxIdleConns:          100,
63                         IdleConnTimeout:       90 * time.Second,
64                         TLSHandshakeTimeout:   10 * time.Second,
65                         ExpectContinueTimeout: 1 * time.Second,
66                         TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
67                 }
68         }
69         return &Conn{
70                 clusterID: clusterID,
71                 httpClient: http.Client{
72                         CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse },
73                         Transport:     transport,
74                 },
75                 baseURL:       *url,
76                 tokenProvider: tp,
77         }
78 }
79
80 func (conn *Conn) requestAndDecode(ctx context.Context, dst interface{}, ep arvados.APIEndpoint, body io.Reader, opts interface{}) error {
81         aClient := arvados.Client{
82                 Client:     &conn.httpClient,
83                 Scheme:     conn.baseURL.Scheme,
84                 APIHost:    conn.baseURL.Host,
85                 SendHeader: conn.SendHeader,
86         }
87         tokens, err := conn.tokenProvider(ctx)
88         if err != nil {
89                 return err
90         } else if len(tokens) > 0 {
91                 ctx = arvados.ContextWithAuthorization(ctx, "Bearer "+tokens[0])
92         } else {
93                 // Use a non-empty auth string to ensure we override
94                 // any default token set on aClient -- and to avoid
95                 // having the remote prompt us to send a token by
96                 // responding 401.
97                 ctx = arvados.ContextWithAuthorization(ctx, "Bearer -")
98         }
99
100         // Encode opts to JSON and decode from there to a
101         // map[string]interface{}, so we can munge the query params
102         // using the JSON key names specified by opts' struct tags.
103         j, err := json.Marshal(opts)
104         if err != nil {
105                 return fmt.Errorf("%T: requestAndDecode: Marshal opts: %s", conn, err)
106         }
107         var params map[string]interface{}
108         dec := json.NewDecoder(bytes.NewBuffer(j))
109         dec.UseNumber()
110         err = dec.Decode(&params)
111         if err != nil {
112                 return fmt.Errorf("%T: requestAndDecode: Decode opts: %s", conn, err)
113         }
114         if attrs, ok := params["attrs"]; ok && ep.AttrsKey != "" {
115                 params[ep.AttrsKey] = attrs
116                 delete(params, "attrs")
117         }
118         if limitStr, ok := params["limit"]; ok {
119                 if limit, err := strconv.ParseInt(string(limitStr.(json.Number)), 10, 64); err == nil && limit < 0 {
120                         // Negative limit means "not specified" here, but some
121                         // servers/versions do not accept that, so we need to
122                         // remove it entirely.
123                         delete(params, "limit")
124                 }
125         }
126         if len(tokens) > 1 {
127                 params["reader_tokens"] = tokens[1:]
128         }
129         path := ep.Path
130         if strings.Contains(ep.Path, "/{uuid}") {
131                 uuid, _ := params["uuid"].(string)
132                 path = strings.Replace(path, "/{uuid}", "/"+uuid, 1)
133                 delete(params, "uuid")
134         }
135         return aClient.RequestAndDecodeContext(ctx, dst, ep.Method, path, body, params)
136 }
137
138 func (conn *Conn) BaseURL() url.URL {
139         return conn.baseURL
140 }
141
142 func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
143         ep := arvados.EndpointConfigGet
144         var resp json.RawMessage
145         err := conn.requestAndDecode(ctx, &resp, ep, nil, nil)
146         return resp, err
147 }
148
149 func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
150         ep := arvados.EndpointLogin
151         var resp arvados.LoginResponse
152         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
153         resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
154         return resp, err
155 }
156
157 func (conn *Conn) Logout(ctx context.Context, options arvados.LogoutOptions) (arvados.LogoutResponse, error) {
158         ep := arvados.EndpointLogout
159         var resp arvados.LogoutResponse
160         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
161         resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
162         return resp, err
163 }
164
165 // If the given location is a valid URL and its origin is the same as
166 // conn.baseURL, return it as a relative URL. Otherwise, return it
167 // unmodified.
168 func (conn *Conn) relativeToBaseURL(location string) string {
169         u, err := url.Parse(location)
170         if err == nil && u.Scheme == conn.baseURL.Scheme && strings.ToLower(u.Host) == strings.ToLower(conn.baseURL.Host) {
171                 u.Opaque = ""
172                 u.Scheme = ""
173                 u.User = nil
174                 u.Host = ""
175                 return u.String()
176         }
177         return location
178 }
179
180 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
181         ep := arvados.EndpointCollectionCreate
182         var resp arvados.Collection
183         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
184         return resp, err
185 }
186
187 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
188         ep := arvados.EndpointCollectionUpdate
189         var resp arvados.Collection
190         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
191         return resp, err
192 }
193
194 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
195         ep := arvados.EndpointCollectionGet
196         var resp arvados.Collection
197         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
198         return resp, err
199 }
200
201 func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
202         ep := arvados.EndpointCollectionList
203         var resp arvados.CollectionList
204         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
205         return resp, err
206 }
207
208 func (conn *Conn) CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
209         ep := arvados.EndpointCollectionProvenance
210         var resp map[string]interface{}
211         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
212         return resp, err
213 }
214
215 func (conn *Conn) CollectionUsedBy(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
216         ep := arvados.EndpointCollectionUsedBy
217         var resp map[string]interface{}
218         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
219         return resp, err
220 }
221
222 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
223         ep := arvados.EndpointCollectionDelete
224         var resp arvados.Collection
225         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
226         return resp, err
227 }
228
229 func (conn *Conn) CollectionTrash(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
230         ep := arvados.EndpointCollectionTrash
231         var resp arvados.Collection
232         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
233         return resp, err
234 }
235
236 func (conn *Conn) CollectionUntrash(ctx context.Context, options arvados.UntrashOptions) (arvados.Collection, error) {
237         ep := arvados.EndpointCollectionUntrash
238         var resp arvados.Collection
239         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
240         return resp, err
241 }
242
243 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
244         ep := arvados.EndpointContainerCreate
245         var resp arvados.Container
246         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
247         return resp, err
248 }
249
250 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
251         ep := arvados.EndpointContainerUpdate
252         var resp arvados.Container
253         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
254         return resp, err
255 }
256
257 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
258         ep := arvados.EndpointContainerGet
259         var resp arvados.Container
260         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
261         return resp, err
262 }
263
264 func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
265         ep := arvados.EndpointContainerList
266         var resp arvados.ContainerList
267         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
268         return resp, err
269 }
270
271 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
272         ep := arvados.EndpointContainerDelete
273         var resp arvados.Container
274         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
275         return resp, err
276 }
277
278 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
279         ep := arvados.EndpointContainerLock
280         var resp arvados.Container
281         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
282         return resp, err
283 }
284
285 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
286         ep := arvados.EndpointContainerUnlock
287         var resp arvados.Container
288         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
289         return resp, err
290 }
291
292 // ContainerSSH returns a connection to the out-of-band SSH server for
293 // a running container. If the returned error is nil, the caller is
294 // responsible for closing sshconn.Conn.
295 func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
296         addr := conn.baseURL.Host
297         if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
298                 // hostname or ::1 or 1::1
299                 addr = net.JoinHostPort(addr, "https")
300         }
301         insecure := false
302         if tlsconf := conn.httpClient.Transport.(*http.Transport).TLSClientConfig; tlsconf != nil && tlsconf.InsecureSkipVerify {
303                 insecure = true
304         }
305         netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: insecure})
306         if err != nil {
307                 err = fmt.Errorf("tls.Dial: %w", err)
308                 return
309         }
310         defer func() {
311                 if err != nil {
312                         netconn.Close()
313                 }
314         }()
315         bufr := bufio.NewReader(netconn)
316         bufw := bufio.NewWriter(netconn)
317
318         u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
319         if err != nil {
320                 err = fmt.Errorf("tls.Dial: %w", err)
321                 return
322         }
323         u.RawQuery = url.Values{
324                 "detach_keys":    {options.DetachKeys},
325                 "login_username": {options.LoginUsername},
326         }.Encode()
327         tokens, err := conn.tokenProvider(ctx)
328         if err != nil {
329                 return
330         } else if len(tokens) < 1 {
331                 err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
332                 return
333         }
334         bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
335         bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
336         bufw.WriteString("Host: " + u.Host + "\r\n")
337         bufw.WriteString("Upgrade: ssh\r\n")
338         bufw.WriteString("\r\n")
339         bufw.Flush()
340         resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
341         if err != nil {
342                 err = fmt.Errorf("http.ReadResponse: %w", err)
343                 return
344         }
345         if resp.StatusCode != http.StatusSwitchingProtocols {
346                 defer resp.Body.Close()
347                 body, _ := ioutil.ReadAll(resp.Body)
348                 var message string
349                 var errDoc httpserver.ErrorResponse
350                 if err := json.Unmarshal(body, &errDoc); err == nil {
351                         message = strings.Join(errDoc.Errors, "; ")
352                 } else {
353                         message = fmt.Sprintf("%q", body)
354                 }
355                 err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
356                 return
357         }
358         if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
359                 strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
360                 err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
361                 return
362         }
363         sshconn.Conn = netconn
364         sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
365         return
366 }
367
368 func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
369         ep := arvados.EndpointContainerRequestList
370         var resp arvados.ContainerRequestList
371         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
372         return resp, err
373 }
374
375 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
376         ep := arvados.EndpointSpecimenCreate
377         var resp arvados.Specimen
378         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
379         return resp, err
380 }
381
382 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
383         ep := arvados.EndpointSpecimenUpdate
384         var resp arvados.Specimen
385         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
386         return resp, err
387 }
388
389 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
390         ep := arvados.EndpointSpecimenGet
391         var resp arvados.Specimen
392         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
393         return resp, err
394 }
395
396 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
397         ep := arvados.EndpointSpecimenList
398         var resp arvados.SpecimenList
399         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
400         return resp, err
401 }
402
403 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
404         ep := arvados.EndpointSpecimenDelete
405         var resp arvados.Specimen
406         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
407         return resp, err
408 }
409
410 func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
411         ep := arvados.EndpointUserCreate
412         var resp arvados.User
413         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
414         return resp, err
415 }
416 func (conn *Conn) UserUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.User, error) {
417         ep := arvados.EndpointUserUpdate
418         var resp arvados.User
419         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
420         return resp, err
421 }
422 func (conn *Conn) UserUpdateUUID(ctx context.Context, options arvados.UpdateUUIDOptions) (arvados.User, error) {
423         ep := arvados.EndpointUserUpdateUUID
424         var resp arvados.User
425         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
426         return resp, err
427 }
428 func (conn *Conn) UserMerge(ctx context.Context, options arvados.UserMergeOptions) (arvados.User, error) {
429         ep := arvados.EndpointUserMerge
430         var resp arvados.User
431         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
432         return resp, err
433 }
434 func (conn *Conn) UserActivate(ctx context.Context, options arvados.UserActivateOptions) (arvados.User, error) {
435         ep := arvados.EndpointUserActivate
436         var resp arvados.User
437         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
438         return resp, err
439 }
440 func (conn *Conn) UserSetup(ctx context.Context, options arvados.UserSetupOptions) (map[string]interface{}, error) {
441         ep := arvados.EndpointUserSetup
442         var resp map[string]interface{}
443         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
444         return resp, err
445 }
446 func (conn *Conn) UserUnsetup(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
447         ep := arvados.EndpointUserUnsetup
448         var resp arvados.User
449         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
450         return resp, err
451 }
452 func (conn *Conn) UserGet(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
453         ep := arvados.EndpointUserGet
454         var resp arvados.User
455         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
456         return resp, err
457 }
458 func (conn *Conn) UserGetCurrent(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
459         ep := arvados.EndpointUserGetCurrent
460         var resp arvados.User
461         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
462         return resp, err
463 }
464 func (conn *Conn) UserGetSystem(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
465         ep := arvados.EndpointUserGetSystem
466         var resp arvados.User
467         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
468         return resp, err
469 }
470 func (conn *Conn) UserList(ctx context.Context, options arvados.ListOptions) (arvados.UserList, error) {
471         ep := arvados.EndpointUserList
472         var resp arvados.UserList
473         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
474         return resp, err
475 }
476 func (conn *Conn) UserDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.User, error) {
477         ep := arvados.EndpointUserDelete
478         var resp arvados.User
479         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
480         return resp, err
481 }
482
483 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
484         ep := arvados.EndpointAPIClientAuthorizationCurrent
485         var resp arvados.APIClientAuthorization
486         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
487         return resp, err
488 }
489
490 type UserSessionAuthInfo struct {
491         Email           string   `json:"email"`
492         AlternateEmails []string `json:"alternate_emails"`
493         FirstName       string   `json:"first_name"`
494         LastName        string   `json:"last_name"`
495         Username        string   `json:"username"`
496 }
497
498 type UserSessionCreateOptions struct {
499         AuthInfo UserSessionAuthInfo `json:"auth_info"`
500         ReturnTo string              `json:"return_to"`
501 }
502
503 func (conn *Conn) UserSessionCreate(ctx context.Context, options UserSessionCreateOptions) (arvados.LoginResponse, error) {
504         ep := arvados.APIEndpoint{Method: "POST", Path: "auth/controller/callback"}
505         var resp arvados.LoginResponse
506         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
507         return resp, err
508 }
509
510 func (conn *Conn) UserBatchUpdate(ctx context.Context, options arvados.UserBatchUpdateOptions) (arvados.UserList, error) {
511         ep := arvados.EndpointUserBatchUpdate
512         var resp arvados.UserList
513         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
514         return resp, err
515 }
516
517 func (conn *Conn) UserAuthenticate(ctx context.Context, options arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
518         ep := arvados.EndpointUserAuthenticate
519         var resp arvados.APIClientAuthorization
520         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
521         return resp, err
522 }