17417: Merge branch 'main' into 17417-add-arm64
[arvados.git] / services / keepproxy / keepproxy.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net"
15         "net/http"
16         "os"
17         "os/signal"
18         "regexp"
19         "strings"
20         "syscall"
21         "time"
22
23         "git.arvados.org/arvados.git/lib/cmd"
24         "git.arvados.org/arvados.git/lib/config"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
27         "git.arvados.org/arvados.git/sdk/go/ctxlog"
28         "git.arvados.org/arvados.git/sdk/go/health"
29         "git.arvados.org/arvados.git/sdk/go/httpserver"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/coreos/go-systemd/daemon"
32         "github.com/ghodss/yaml"
33         "github.com/gorilla/mux"
34         lru "github.com/hashicorp/golang-lru"
35         "github.com/sirupsen/logrus"
36 )
37
38 var version = "dev"
39
40 var (
41         listener net.Listener
42         router   http.Handler
43 )
44
45 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
46
47 func configure(args []string) (*arvados.Cluster, logrus.FieldLogger, error) {
48         prog := args[0]
49         flags := flag.NewFlagSet(prog, flag.ContinueOnError)
50
51         dumpConfig := flags.Bool("dump-config", false, "write current configuration to stdout and exit")
52         getVersion := flags.Bool("version", false, "Print version information and exit.")
53
54         initLogger := logrus.New()
55         initLogger.Formatter = &logrus.JSONFormatter{
56                 TimestampFormat: rfc3339NanoFixed,
57         }
58         var logger logrus.FieldLogger = initLogger
59
60         loader := config.NewLoader(os.Stdin, logger)
61         loader.SetupFlags(flags)
62         args = loader.MungeLegacyConfigArgs(logger, args[1:], "-legacy-keepproxy-config")
63
64         if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
65                 os.Exit(code)
66         } else if *getVersion {
67                 fmt.Printf("keepproxy %s\n", version)
68                 return nil, logger, nil
69         }
70
71         cfg, err := loader.Load()
72         if err != nil {
73                 return nil, logger, err
74         }
75         cluster, err := cfg.GetCluster("")
76         if err != nil {
77                 return nil, logger, err
78         }
79
80         logger = ctxlog.New(os.Stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
81                 "ClusterID": cluster.ClusterID,
82                 "PID":       os.Getpid(),
83         })
84
85         if *dumpConfig {
86                 out, err := yaml.Marshal(cfg)
87                 if err != nil {
88                         return nil, logger, err
89                 }
90                 if _, err := os.Stdout.Write(out); err != nil {
91                         return nil, logger, err
92                 }
93                 return nil, logger, nil
94         }
95
96         return cluster, logger, nil
97 }
98
99 func main() {
100         cluster, logger, err := configure(os.Args)
101         if err != nil {
102                 logger.Fatal(err)
103         }
104         if cluster == nil {
105                 return
106         }
107
108         logger.Printf("keepproxy %s started", version)
109
110         if err := run(logger, cluster); err != nil {
111                 logger.Fatal(err)
112         }
113
114         logger.Println("shutting down")
115 }
116
117 func run(logger logrus.FieldLogger, cluster *arvados.Cluster) error {
118         client, err := arvados.NewClientFromConfig(cluster)
119         if err != nil {
120                 return err
121         }
122         client.AuthToken = cluster.SystemRootToken
123
124         arv, err := arvadosclient.New(client)
125         if err != nil {
126                 return fmt.Errorf("Error setting up arvados client %v", err)
127         }
128
129         // If a config file is available, use the keepstores defined there
130         // instead of the legacy autodiscover mechanism via the API server
131         for k := range cluster.Services.Keepstore.InternalURLs {
132                 arv.KeepServiceURIs = append(arv.KeepServiceURIs, strings.TrimRight(k.String(), "/"))
133         }
134
135         if cluster.SystemLogs.LogLevel == "debug" {
136                 keepclient.DebugPrintf = logger.Printf
137         }
138         kc, err := keepclient.MakeKeepClient(arv)
139         if err != nil {
140                 return fmt.Errorf("Error setting up keep client %v", err)
141         }
142         keepclient.RefreshServiceDiscoveryOnSIGHUP()
143
144         if cluster.Collections.DefaultReplication > 0 {
145                 kc.Want_replicas = cluster.Collections.DefaultReplication
146         }
147
148         var listen arvados.URL
149         for listen = range cluster.Services.Keepproxy.InternalURLs {
150                 break
151         }
152
153         var lErr error
154         listener, lErr = net.Listen("tcp", listen.Host)
155         if lErr != nil {
156                 return fmt.Errorf("listen(%s): %v", listen.Host, lErr)
157         }
158
159         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
160                 logger.Printf("Error notifying init daemon: %v", err)
161         }
162         logger.Println("listening at", listener.Addr())
163
164         // Shut down the server gracefully (by closing the listener)
165         // if SIGTERM is received.
166         term := make(chan os.Signal, 1)
167         go func(sig <-chan os.Signal) {
168                 s := <-sig
169                 logger.Println("caught signal:", s)
170                 listener.Close()
171         }(term)
172         signal.Notify(term, syscall.SIGTERM)
173         signal.Notify(term, syscall.SIGINT)
174
175         // Start serving requests.
176         router, err = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
177         if err != nil {
178                 return err
179         }
180         server := http.Server{
181                 Handler: httpserver.AddRequestIDs(httpserver.LogRequests(router)),
182                 BaseContext: func(net.Listener) context.Context {
183                         return ctxlog.Context(context.Background(), logger)
184                 },
185         }
186         return server.Serve(listener)
187 }
188
189 type TokenCacheEntry struct {
190         expire int64
191         user   *arvados.User
192 }
193
194 type APITokenCache struct {
195         tokens     *lru.TwoQueueCache
196         expireTime int64
197 }
198
199 // RememberToken caches the token and set an expire time.  If the
200 // token is already in the cache, it is not updated.
201 func (cache *APITokenCache) RememberToken(token string, user *arvados.User) {
202         now := time.Now().Unix()
203         _, ok := cache.tokens.Get(token)
204         if !ok {
205                 cache.tokens.Add(token, TokenCacheEntry{
206                         expire: now + cache.expireTime,
207                         user:   user,
208                 })
209         }
210 }
211
212 // RecallToken checks if the cached token is known and still believed to be
213 // valid.
214 func (cache *APITokenCache) RecallToken(token string) (bool, *arvados.User) {
215         val, ok := cache.tokens.Get(token)
216         if !ok {
217                 return false, nil
218         }
219
220         cacheEntry := val.(TokenCacheEntry)
221         now := time.Now().Unix()
222         if now < cacheEntry.expire {
223                 // Token is known and still valid
224                 return true, cacheEntry.user
225         } else {
226                 // Token is expired
227                 cache.tokens.Remove(token)
228                 return false, nil
229         }
230 }
231
232 // GetRemoteAddress returns a string with the remote address for the request.
233 // If the X-Forwarded-For header is set and has a non-zero length, it returns a
234 // string made from a comma separated list of all the remote addresses,
235 // starting with the one(s) from the X-Forwarded-For header.
236 func GetRemoteAddress(req *http.Request) string {
237         if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
238                 return xff + "," + req.RemoteAddr
239         }
240         return req.RemoteAddr
241 }
242
243 func (h *proxyHandler) CheckAuthorizationHeader(req *http.Request) (pass bool, tok string, user *arvados.User) {
244         parts := strings.SplitN(req.Header.Get("Authorization"), " ", 2)
245         if len(parts) < 2 || !(parts[0] == "OAuth2" || parts[0] == "Bearer") || len(parts[1]) == 0 {
246                 return false, "", nil
247         }
248         tok = parts[1]
249
250         // Tokens are validated differently depending on what kind of
251         // operation is being performed. For example, tokens in
252         // collection-sharing links permit GET requests, but not
253         // PUT requests.
254         var op string
255         if req.Method == "GET" || req.Method == "HEAD" {
256                 op = "read"
257         } else {
258                 op = "write"
259         }
260
261         if ok, user := h.APITokenCache.RecallToken(op + ":" + tok); ok {
262                 // Valid in the cache, short circuit
263                 return true, tok, user
264         }
265
266         var err error
267         arv := *h.KeepClient.Arvados
268         arv.ApiToken = tok
269         arv.RequestID = req.Header.Get("X-Request-Id")
270         user = &arvados.User{}
271         userCurrentError := arv.Call("GET", "users", "", "current", nil, user)
272         err = userCurrentError
273         if err != nil && op == "read" {
274                 apiError, ok := err.(arvadosclient.APIServerError)
275                 if ok && apiError.HttpStatusCode == http.StatusForbidden {
276                         // If it was a scoped "sharing" token it will
277                         // return 403 instead of 401 for the current
278                         // user check.  If it is a download operation
279                         // and they have permission to read the
280                         // keep_services table, we can allow it.
281                         err = arv.Call("HEAD", "keep_services", "", "accessible", nil, nil)
282                 }
283         }
284         if err != nil {
285                 ctxlog.FromContext(req.Context()).Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
286                 return false, "", nil
287         }
288
289         if userCurrentError == nil && user.IsAdmin {
290                 // checking userCurrentError is probably redundant,
291                 // IsAdmin would be false anyway. But can't hurt.
292                 if op == "read" && !h.cluster.Collections.KeepproxyPermission.Admin.Download {
293                         return false, "", nil
294                 }
295                 if op == "write" && !h.cluster.Collections.KeepproxyPermission.Admin.Upload {
296                         return false, "", nil
297                 }
298         } else {
299                 if op == "read" && !h.cluster.Collections.KeepproxyPermission.User.Download {
300                         return false, "", nil
301                 }
302                 if op == "write" && !h.cluster.Collections.KeepproxyPermission.User.Upload {
303                         return false, "", nil
304                 }
305         }
306
307         // Success!  Update cache
308         h.APITokenCache.RememberToken(op+":"+tok, user)
309
310         return true, tok, user
311 }
312
313 // We need to make a private copy of the default http transport early
314 // in initialization, then make copies of our private copy later. It
315 // won't be safe to copy http.DefaultTransport itself later, because
316 // its private mutexes might have already been used. (Without this,
317 // the test suite sometimes panics "concurrent map writes" in
318 // net/http.(*Transport).removeIdleConnLocked().)
319 var defaultTransport = *(http.DefaultTransport.(*http.Transport))
320
321 type proxyHandler struct {
322         http.Handler
323         *keepclient.KeepClient
324         *APITokenCache
325         timeout   time.Duration
326         transport *http.Transport
327         logger    logrus.FieldLogger
328         cluster   *arvados.Cluster
329 }
330
331 // MakeRESTRouter returns an http.Handler that passes GET and PUT
332 // requests to the appropriate handlers.
333 func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger logrus.FieldLogger) (http.Handler, error) {
334         rest := mux.NewRouter()
335
336         transport := defaultTransport
337         transport.DialContext = (&net.Dialer{
338                 Timeout:   keepclient.DefaultConnectTimeout,
339                 KeepAlive: keepclient.DefaultKeepAlive,
340                 DualStack: true,
341         }).DialContext
342         transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
343         transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout
344
345         cacheQ, err := lru.New2Q(500)
346         if err != nil {
347                 return nil, fmt.Errorf("Error from lru.New2Q: %v", err)
348         }
349
350         h := &proxyHandler{
351                 Handler:    rest,
352                 KeepClient: kc,
353                 timeout:    timeout,
354                 transport:  &transport,
355                 APITokenCache: &APITokenCache{
356                         tokens:     cacheQ,
357                         expireTime: 300,
358                 },
359                 logger:  logger,
360                 cluster: cluster,
361         }
362
363         rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD")
364         rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Get).Methods("GET", "HEAD")
365
366         // List all blocks
367         rest.HandleFunc(`/index`, h.Index).Methods("GET")
368
369         // List blocks whose hash has the given prefix
370         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, h.Index).Methods("GET")
371
372         rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Put).Methods("PUT")
373         rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Put).Methods("PUT")
374         rest.HandleFunc(`/`, h.Put).Methods("POST")
375         rest.HandleFunc(`/{any}`, h.Options).Methods("OPTIONS")
376         rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
377
378         rest.Handle("/_health/{check}", &health.Handler{
379                 Token:  cluster.ManagementToken,
380                 Prefix: "/_health/",
381         }).Methods("GET")
382
383         rest.NotFoundHandler = InvalidPathHandler{}
384         return h, nil
385 }
386
387 var errLoopDetected = errors.New("loop detected")
388
389 func (h *proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error {
390         if via := req.Header.Get("Via"); strings.Index(via, " "+viaAlias) >= 0 {
391                 h.logger.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via)
392                 http.Error(resp, errLoopDetected.Error(), http.StatusInternalServerError)
393                 return errLoopDetected
394         }
395         return nil
396 }
397
398 func SetCorsHeaders(resp http.ResponseWriter) {
399         resp.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS")
400         resp.Header().Set("Access-Control-Allow-Origin", "*")
401         resp.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
402         resp.Header().Set("Access-Control-Max-Age", "86486400")
403 }
404
405 type InvalidPathHandler struct{}
406
407 func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
408         ctxlog.FromContext(req.Context()).Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
409         http.Error(resp, "Bad request", http.StatusBadRequest)
410 }
411
412 func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) {
413         ctxlog.FromContext(req.Context()).Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
414         SetCorsHeaders(resp)
415 }
416
417 var errBadAuthorizationHeader = errors.New("Missing or invalid Authorization header, or method not allowed")
418 var errContentLengthMismatch = errors.New("Actual length != expected content length")
419 var errMethodNotSupported = errors.New("Method not supported")
420
421 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
422
423 func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
424         if err := h.checkLoop(resp, req); err != nil {
425                 return
426         }
427         SetCorsHeaders(resp)
428         resp.Header().Set("Via", req.Proto+" "+viaAlias)
429
430         locator := mux.Vars(req)["locator"]
431         var err error
432         var status int
433         var expectLength, responseLength int64
434         var proxiedURI = "-"
435
436         defer func() {
437                 h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
438                 if status != http.StatusOK {
439                         http.Error(resp, err.Error(), status)
440                 }
441         }()
442
443         kc := h.makeKeepClient(req)
444
445         var pass bool
446         var tok string
447         var user *arvados.User
448         if pass, tok, user = h.CheckAuthorizationHeader(req); !pass {
449                 status, err = http.StatusForbidden, errBadAuthorizationHeader
450                 return
451         }
452
453         // Copy ArvadosClient struct and use the client's API token
454         arvclient := *kc.Arvados
455         arvclient.ApiToken = tok
456         kc.Arvados = &arvclient
457
458         var reader io.ReadCloser
459
460         locator = removeHint.ReplaceAllString(locator, "$1")
461
462         if locator != "" {
463                 parts := strings.SplitN(locator, "+", 3)
464                 if len(parts) >= 2 {
465                         logger := h.logger
466                         if user != nil {
467                                 logger = logger.WithField("user_uuid", user.UUID).
468                                         WithField("user_full_name", user.FullName)
469                         }
470                         logger.WithField("locator", fmt.Sprintf("%s+%s", parts[0], parts[1])).Infof("Block download")
471                 }
472         }
473
474         switch req.Method {
475         case "HEAD":
476                 expectLength, proxiedURI, err = kc.Ask(locator)
477         case "GET":
478                 reader, expectLength, proxiedURI, err = kc.Get(locator)
479                 if reader != nil {
480                         defer reader.Close()
481                 }
482         default:
483                 status, err = http.StatusNotImplemented, errMethodNotSupported
484                 return
485         }
486
487         if expectLength == -1 {
488                 h.logger.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
489         }
490
491         switch respErr := err.(type) {
492         case nil:
493                 status = http.StatusOK
494                 resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
495                 switch req.Method {
496                 case "HEAD":
497                         responseLength = 0
498                 case "GET":
499                         responseLength, err = io.Copy(resp, reader)
500                         if err == nil && expectLength > -1 && responseLength != expectLength {
501                                 err = errContentLengthMismatch
502                         }
503                 }
504         case keepclient.Error:
505                 if respErr == keepclient.BlockNotFound {
506                         status = http.StatusNotFound
507                 } else if respErr.Temporary() {
508                         status = http.StatusBadGateway
509                 } else {
510                         status = 422
511                 }
512         default:
513                 status = http.StatusInternalServerError
514         }
515 }
516
517 var errLengthRequired = errors.New(http.StatusText(http.StatusLengthRequired))
518 var errLengthMismatch = errors.New("Locator size hint does not match Content-Length header")
519
520 func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
521         if err := h.checkLoop(resp, req); err != nil {
522                 return
523         }
524         SetCorsHeaders(resp)
525         resp.Header().Set("Via", "HTTP/1.1 "+viaAlias)
526
527         kc := h.makeKeepClient(req)
528
529         var err error
530         var expectLength int64
531         var status = http.StatusInternalServerError
532         var wroteReplicas int
533         var locatorOut string = "-"
534
535         defer func() {
536                 h.logger.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
537                 if status != http.StatusOK {
538                         http.Error(resp, err.Error(), status)
539                 }
540         }()
541
542         locatorIn := mux.Vars(req)["locator"]
543
544         // Check if the client specified storage classes
545         if req.Header.Get("X-Keep-Storage-Classes") != "" {
546                 var scl []string
547                 for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
548                         scl = append(scl, strings.Trim(sc, " "))
549                 }
550                 kc.SetStorageClasses(scl)
551         }
552
553         _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
554         if err != nil || expectLength < 0 {
555                 err = errLengthRequired
556                 status = http.StatusLengthRequired
557                 return
558         }
559
560         if locatorIn != "" {
561                 var loc *keepclient.Locator
562                 if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
563                         status = http.StatusBadRequest
564                         return
565                 } else if loc.Size > 0 && int64(loc.Size) != expectLength {
566                         err = errLengthMismatch
567                         status = http.StatusBadRequest
568                         return
569                 }
570         }
571
572         var pass bool
573         var tok string
574         var user *arvados.User
575         if pass, tok, user = h.CheckAuthorizationHeader(req); !pass {
576                 err = errBadAuthorizationHeader
577                 status = http.StatusForbidden
578                 return
579         }
580
581         // Copy ArvadosClient struct and use the client's API token
582         arvclient := *kc.Arvados
583         arvclient.ApiToken = tok
584         kc.Arvados = &arvclient
585
586         // Check if the client specified the number of replicas
587         if desiredReplicas := req.Header.Get(keepclient.XKeepDesiredReplicas); desiredReplicas != "" {
588                 var r int
589                 _, err := fmt.Sscanf(desiredReplicas, "%d", &r)
590                 if err == nil {
591                         kc.Want_replicas = r
592                 }
593         }
594
595         // Now try to put the block through
596         if locatorIn == "" {
597                 bytes, err2 := ioutil.ReadAll(req.Body)
598                 if err2 != nil {
599                         err = fmt.Errorf("Error reading request body: %s", err2)
600                         status = http.StatusInternalServerError
601                         return
602                 }
603                 locatorOut, wroteReplicas, err = kc.PutB(bytes)
604         } else {
605                 locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
606         }
607
608         if locatorOut != "" {
609                 parts := strings.SplitN(locatorOut, "+", 3)
610                 if len(parts) >= 2 {
611                         logger := h.logger
612                         if user != nil {
613                                 logger = logger.WithField("user_uuid", user.UUID).
614                                         WithField("user_full_name", user.FullName)
615                         }
616                         logger.WithField("locator", fmt.Sprintf("%s+%s", parts[0], parts[1])).Infof("Block upload")
617                 }
618         }
619
620         // Tell the client how many successful PUTs we accomplished
621         resp.Header().Set(keepclient.XKeepReplicasStored, fmt.Sprintf("%d", wroteReplicas))
622
623         switch err.(type) {
624         case nil:
625                 status = http.StatusOK
626                 if len(kc.StorageClasses) > 0 {
627                         // A successful PUT request with storage classes means that all
628                         // storage classes were fulfilled, so the client will get a
629                         // confirmation via the X-Storage-Classes-Confirmed header.
630                         hdr := ""
631                         isFirst := true
632                         for _, sc := range kc.StorageClasses {
633                                 if isFirst {
634                                         hdr = fmt.Sprintf("%s=%d", sc, wroteReplicas)
635                                         isFirst = false
636                                 } else {
637                                         hdr += fmt.Sprintf(", %s=%d", sc, wroteReplicas)
638                                 }
639                         }
640                         resp.Header().Set(keepclient.XKeepStorageClassesConfirmed, hdr)
641                 }
642                 _, err = io.WriteString(resp, locatorOut)
643         case keepclient.OversizeBlockError:
644                 // Too much data
645                 status = http.StatusRequestEntityTooLarge
646         case keepclient.InsufficientReplicasError:
647                 status = http.StatusServiceUnavailable
648         default:
649                 status = http.StatusBadGateway
650         }
651 }
652
653 // ServeHTTP implementation for IndexHandler
654 // Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
655 // For each keep server found in LocalRoots:
656 //   Invokes GetIndex using keepclient
657 //   Expects "complete" response (terminating with blank new line)
658 //   Aborts on any errors
659 // Concatenates responses from all those keep servers and returns
660 func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) {
661         SetCorsHeaders(resp)
662
663         prefix := mux.Vars(req)["prefix"]
664         var err error
665         var status int
666
667         defer func() {
668                 if status != http.StatusOK {
669                         http.Error(resp, err.Error(), status)
670                 }
671         }()
672
673         kc := h.makeKeepClient(req)
674         ok, token, _ := h.CheckAuthorizationHeader(req)
675         if !ok {
676                 status, err = http.StatusForbidden, errBadAuthorizationHeader
677                 return
678         }
679
680         // Copy ArvadosClient struct and use the client's API token
681         arvclient := *kc.Arvados
682         arvclient.ApiToken = token
683         kc.Arvados = &arvclient
684
685         // Only GET method is supported
686         if req.Method != "GET" {
687                 status, err = http.StatusNotImplemented, errMethodNotSupported
688                 return
689         }
690
691         // Get index from all LocalRoots and write to resp
692         var reader io.Reader
693         for uuid := range kc.LocalRoots() {
694                 reader, err = kc.GetIndex(uuid, prefix)
695                 if err != nil {
696                         status = http.StatusBadGateway
697                         return
698                 }
699
700                 _, err = io.Copy(resp, reader)
701                 if err != nil {
702                         status = http.StatusBadGateway
703                         return
704                 }
705         }
706
707         // Got index from all the keep servers and wrote to resp
708         status = http.StatusOK
709         resp.Write([]byte("\n"))
710 }
711
712 func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient {
713         kc := *h.KeepClient
714         kc.RequestID = req.Header.Get("X-Request-Id")
715         kc.HTTPClient = &proxyClient{
716                 client: &http.Client{
717                         Timeout:   h.timeout,
718                         Transport: h.transport,
719                 },
720                 proto: req.Proto,
721         }
722         return &kc
723 }