17170: Don't reuse transport that might have http2 enabled.
[arvados.git] / lib / controller / rpc / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package rpc
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net"
18         "net/http"
19         "net/url"
20         "strconv"
21         "strings"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27 )
28
29 type TokenProvider func(context.Context) ([]string, error)
30
31 func PassthroughTokenProvider(ctx context.Context) ([]string, error) {
32         incoming, ok := auth.FromContext(ctx)
33         if !ok {
34                 return nil, errors.New("no token provided")
35         }
36         return incoming.Tokens, nil
37 }
38
39 type Conn struct {
40         SendHeader    http.Header
41         clusterID     string
42         httpClient    http.Client
43         baseURL       url.URL
44         tokenProvider TokenProvider
45 }
46
47 func NewConn(clusterID string, url *url.URL, insecure bool, tp TokenProvider) *Conn {
48         transport := http.DefaultTransport
49         if insecure {
50                 // It's not safe to copy *http.DefaultTransport
51                 // because it has a mutex (which might be locked)
52                 // protecting a private map (which might not be nil).
53                 // So we build our own, using the Go 1.12 default
54                 // values, ignoring any changes the application has
55                 // made to http.DefaultTransport.
56                 transport = &http.Transport{
57                         DialContext: (&net.Dialer{
58                                 Timeout:   30 * time.Second,
59                                 KeepAlive: 30 * time.Second,
60                                 DualStack: true,
61                         }).DialContext,
62                         MaxIdleConns:          100,
63                         IdleConnTimeout:       90 * time.Second,
64                         TLSHandshakeTimeout:   10 * time.Second,
65                         ExpectContinueTimeout: 1 * time.Second,
66                         TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
67                 }
68         }
69         return &Conn{
70                 clusterID: clusterID,
71                 httpClient: http.Client{
72                         CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse },
73                         Transport:     transport,
74                 },
75                 baseURL:       *url,
76                 tokenProvider: tp,
77         }
78 }
79
80 func (conn *Conn) requestAndDecode(ctx context.Context, dst interface{}, ep arvados.APIEndpoint, body io.Reader, opts interface{}) error {
81         aClient := arvados.Client{
82                 Client:     &conn.httpClient,
83                 Scheme:     conn.baseURL.Scheme,
84                 APIHost:    conn.baseURL.Host,
85                 SendHeader: conn.SendHeader,
86         }
87         tokens, err := conn.tokenProvider(ctx)
88         if err != nil {
89                 return err
90         } else if len(tokens) > 0 {
91                 ctx = arvados.ContextWithAuthorization(ctx, "Bearer "+tokens[0])
92         } else {
93                 // Use a non-empty auth string to ensure we override
94                 // any default token set on aClient -- and to avoid
95                 // having the remote prompt us to send a token by
96                 // responding 401.
97                 ctx = arvados.ContextWithAuthorization(ctx, "Bearer -")
98         }
99
100         // Encode opts to JSON and decode from there to a
101         // map[string]interface{}, so we can munge the query params
102         // using the JSON key names specified by opts' struct tags.
103         j, err := json.Marshal(opts)
104         if err != nil {
105                 return fmt.Errorf("%T: requestAndDecode: Marshal opts: %s", conn, err)
106         }
107         var params map[string]interface{}
108         dec := json.NewDecoder(bytes.NewBuffer(j))
109         dec.UseNumber()
110         err = dec.Decode(&params)
111         if err != nil {
112                 return fmt.Errorf("%T: requestAndDecode: Decode opts: %s", conn, err)
113         }
114         if attrs, ok := params["attrs"]; ok && ep.AttrsKey != "" {
115                 params[ep.AttrsKey] = attrs
116                 delete(params, "attrs")
117         }
118         if limitStr, ok := params["limit"]; ok {
119                 if limit, err := strconv.ParseInt(string(limitStr.(json.Number)), 10, 64); err == nil && limit < 0 {
120                         // Negative limit means "not specified" here, but some
121                         // servers/versions do not accept that, so we need to
122                         // remove it entirely.
123                         delete(params, "limit")
124                 }
125         }
126         if len(tokens) > 1 {
127                 params["reader_tokens"] = tokens[1:]
128         }
129         path := ep.Path
130         if strings.Contains(ep.Path, "/{uuid}") {
131                 uuid, _ := params["uuid"].(string)
132                 path = strings.Replace(path, "/{uuid}", "/"+uuid, 1)
133                 delete(params, "uuid")
134         }
135         return aClient.RequestAndDecodeContext(ctx, dst, ep.Method, path, body, params)
136 }
137
138 func (conn *Conn) BaseURL() url.URL {
139         return conn.baseURL
140 }
141
142 func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
143         ep := arvados.EndpointConfigGet
144         var resp json.RawMessage
145         err := conn.requestAndDecode(ctx, &resp, ep, nil, nil)
146         return resp, err
147 }
148
149 func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
150         ep := arvados.EndpointLogin
151         var resp arvados.LoginResponse
152         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
153         resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
154         return resp, err
155 }
156
157 func (conn *Conn) Logout(ctx context.Context, options arvados.LogoutOptions) (arvados.LogoutResponse, error) {
158         ep := arvados.EndpointLogout
159         var resp arvados.LogoutResponse
160         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
161         resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
162         return resp, err
163 }
164
165 // If the given location is a valid URL and its origin is the same as
166 // conn.baseURL, return it as a relative URL. Otherwise, return it
167 // unmodified.
168 func (conn *Conn) relativeToBaseURL(location string) string {
169         u, err := url.Parse(location)
170         if err == nil && u.Scheme == conn.baseURL.Scheme && strings.ToLower(u.Host) == strings.ToLower(conn.baseURL.Host) {
171                 u.Opaque = ""
172                 u.Scheme = ""
173                 u.User = nil
174                 u.Host = ""
175                 return u.String()
176         }
177         return location
178 }
179
180 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
181         ep := arvados.EndpointCollectionCreate
182         var resp arvados.Collection
183         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
184         return resp, err
185 }
186
187 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
188         ep := arvados.EndpointCollectionUpdate
189         var resp arvados.Collection
190         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
191         return resp, err
192 }
193
194 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
195         ep := arvados.EndpointCollectionGet
196         var resp arvados.Collection
197         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
198         return resp, err
199 }
200
201 func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
202         ep := arvados.EndpointCollectionList
203         var resp arvados.CollectionList
204         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
205         return resp, err
206 }
207
208 func (conn *Conn) CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
209         ep := arvados.EndpointCollectionProvenance
210         var resp map[string]interface{}
211         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
212         return resp, err
213 }
214
215 func (conn *Conn) CollectionUsedBy(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
216         ep := arvados.EndpointCollectionUsedBy
217         var resp map[string]interface{}
218         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
219         return resp, err
220 }
221
222 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
223         ep := arvados.EndpointCollectionDelete
224         var resp arvados.Collection
225         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
226         return resp, err
227 }
228
229 func (conn *Conn) CollectionTrash(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
230         ep := arvados.EndpointCollectionTrash
231         var resp arvados.Collection
232         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
233         return resp, err
234 }
235
236 func (conn *Conn) CollectionUntrash(ctx context.Context, options arvados.UntrashOptions) (arvados.Collection, error) {
237         ep := arvados.EndpointCollectionUntrash
238         var resp arvados.Collection
239         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
240         return resp, err
241 }
242
243 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
244         ep := arvados.EndpointContainerCreate
245         var resp arvados.Container
246         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
247         return resp, err
248 }
249
250 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
251         ep := arvados.EndpointContainerUpdate
252         var resp arvados.Container
253         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
254         return resp, err
255 }
256
257 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
258         ep := arvados.EndpointContainerGet
259         var resp arvados.Container
260         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
261         return resp, err
262 }
263
264 func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
265         ep := arvados.EndpointContainerList
266         var resp arvados.ContainerList
267         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
268         return resp, err
269 }
270
271 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
272         ep := arvados.EndpointContainerDelete
273         var resp arvados.Container
274         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
275         return resp, err
276 }
277
278 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
279         ep := arvados.EndpointContainerLock
280         var resp arvados.Container
281         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
282         return resp, err
283 }
284
285 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
286         ep := arvados.EndpointContainerUnlock
287         var resp arvados.Container
288         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
289         return resp, err
290 }
291
292 // ContainerSSH returns a connection to the out-of-band SSH server for
293 // a running container. If the returned error is nil, the caller is
294 // responsible for closing sshconn.Conn.
295 func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
296         addr := conn.baseURL.Host
297         if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
298                 // hostname or ::1 or 1::1
299                 addr = net.JoinHostPort(addr, "https")
300         }
301         netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: conn.httpClient.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify})
302         if err != nil {
303                 err = fmt.Errorf("tls.Dial: %w", err)
304                 return
305         }
306         defer func() {
307                 if err != nil {
308                         netconn.Close()
309                 }
310         }()
311         bufr := bufio.NewReader(netconn)
312         bufw := bufio.NewWriter(netconn)
313
314         u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
315         if err != nil {
316                 return
317         }
318         u.RawQuery = url.Values{
319                 "detach_keys":    {options.DetachKeys},
320                 "login_username": {options.LoginUsername},
321         }.Encode()
322         tokens, err := conn.tokenProvider(ctx)
323         if err != nil {
324                 return
325         } else if len(tokens) < 1 {
326                 err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
327                 return
328         }
329         bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
330         bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
331         bufw.WriteString("Host: " + u.Host + "\r\n")
332         bufw.WriteString("Upgrade: ssh\r\n")
333         bufw.WriteString("\r\n")
334         bufw.Flush()
335         resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
336         if err != nil {
337                 return
338         }
339         if resp.StatusCode != http.StatusSwitchingProtocols {
340                 defer resp.Body.Close()
341                 body, _ := ioutil.ReadAll(resp.Body)
342                 err = fmt.Errorf("server did not provide a tunnel: %d %q", resp.StatusCode, body)
343                 return
344         }
345         if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
346                 strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
347                 err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
348                 return
349         }
350         sshconn.Conn = netconn
351         sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
352         return
353 }
354
355 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
356         ep := arvados.EndpointSpecimenCreate
357         var resp arvados.Specimen
358         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
359         return resp, err
360 }
361
362 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
363         ep := arvados.EndpointSpecimenUpdate
364         var resp arvados.Specimen
365         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
366         return resp, err
367 }
368
369 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
370         ep := arvados.EndpointSpecimenGet
371         var resp arvados.Specimen
372         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
373         return resp, err
374 }
375
376 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
377         ep := arvados.EndpointSpecimenList
378         var resp arvados.SpecimenList
379         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
380         return resp, err
381 }
382
383 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
384         ep := arvados.EndpointSpecimenDelete
385         var resp arvados.Specimen
386         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
387         return resp, err
388 }
389
390 func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
391         ep := arvados.EndpointUserCreate
392         var resp arvados.User
393         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
394         return resp, err
395 }
396 func (conn *Conn) UserUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.User, error) {
397         ep := arvados.EndpointUserUpdate
398         var resp arvados.User
399         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
400         return resp, err
401 }
402 func (conn *Conn) UserUpdateUUID(ctx context.Context, options arvados.UpdateUUIDOptions) (arvados.User, error) {
403         ep := arvados.EndpointUserUpdateUUID
404         var resp arvados.User
405         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
406         return resp, err
407 }
408 func (conn *Conn) UserMerge(ctx context.Context, options arvados.UserMergeOptions) (arvados.User, error) {
409         ep := arvados.EndpointUserMerge
410         var resp arvados.User
411         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
412         return resp, err
413 }
414 func (conn *Conn) UserActivate(ctx context.Context, options arvados.UserActivateOptions) (arvados.User, error) {
415         ep := arvados.EndpointUserActivate
416         var resp arvados.User
417         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
418         return resp, err
419 }
420 func (conn *Conn) UserSetup(ctx context.Context, options arvados.UserSetupOptions) (map[string]interface{}, error) {
421         ep := arvados.EndpointUserSetup
422         var resp map[string]interface{}
423         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
424         return resp, err
425 }
426 func (conn *Conn) UserUnsetup(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
427         ep := arvados.EndpointUserUnsetup
428         var resp arvados.User
429         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
430         return resp, err
431 }
432 func (conn *Conn) UserGet(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
433         ep := arvados.EndpointUserGet
434         var resp arvados.User
435         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
436         return resp, err
437 }
438 func (conn *Conn) UserGetCurrent(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
439         ep := arvados.EndpointUserGetCurrent
440         var resp arvados.User
441         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
442         return resp, err
443 }
444 func (conn *Conn) UserGetSystem(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
445         ep := arvados.EndpointUserGetSystem
446         var resp arvados.User
447         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
448         return resp, err
449 }
450 func (conn *Conn) UserList(ctx context.Context, options arvados.ListOptions) (arvados.UserList, error) {
451         ep := arvados.EndpointUserList
452         var resp arvados.UserList
453         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
454         return resp, err
455 }
456 func (conn *Conn) UserDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.User, error) {
457         ep := arvados.EndpointUserDelete
458         var resp arvados.User
459         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
460         return resp, err
461 }
462
463 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
464         ep := arvados.EndpointAPIClientAuthorizationCurrent
465         var resp arvados.APIClientAuthorization
466         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
467         return resp, err
468 }
469
470 type UserSessionAuthInfo struct {
471         Email           string   `json:"email"`
472         AlternateEmails []string `json:"alternate_emails"`
473         FirstName       string   `json:"first_name"`
474         LastName        string   `json:"last_name"`
475         Username        string   `json:"username"`
476 }
477
478 type UserSessionCreateOptions struct {
479         AuthInfo UserSessionAuthInfo `json:"auth_info"`
480         ReturnTo string              `json:"return_to"`
481 }
482
483 func (conn *Conn) UserSessionCreate(ctx context.Context, options UserSessionCreateOptions) (arvados.LoginResponse, error) {
484         ep := arvados.APIEndpoint{Method: "POST", Path: "auth/controller/callback"}
485         var resp arvados.LoginResponse
486         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
487         return resp, err
488 }
489
490 func (conn *Conn) UserBatchUpdate(ctx context.Context, options arvados.UserBatchUpdateOptions) (arvados.UserList, error) {
491         ep := arvados.EndpointUserBatchUpdate
492         var resp arvados.UserList
493         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
494         return resp, err
495 }
496
497 func (conn *Conn) UserAuthenticate(ctx context.Context, options arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
498         ep := arvados.EndpointUserAuthenticate
499         var resp arvados.APIClientAuthorization
500         err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
501         return resp, err
502 }