Merge branch '2960-keepstore-streaming'
[arvados.git] / sdk / go / keepclient / gateway_shim.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "context"
9         "fmt"
10         "io"
11         "net/http"
12         "strings"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16 )
17
18 // keepViaHTTP implements arvados.KeepGateway by using a KeepClient to
19 // do upstream requests to keepstore and keepproxy.
20 //
21 // This enables KeepClient to use KeepGateway wrappers (like
22 // arvados.DiskCache) to wrap its own HTTP client back-end methods
23 // (getOrHead, httpBlockWrite).
24 //
25 // See (*KeepClient)upstreamGateway() for the relevant glue.
26 type keepViaHTTP struct {
27         *KeepClient
28 }
29
30 func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, error) {
31         rdr, _, _, _, err := kvh.getOrHead("GET", locator, nil)
32         if err != nil {
33                 return 0, err
34         }
35         defer rdr.Close()
36         _, err = io.CopyN(io.Discard, rdr, int64(offset))
37         if err != nil {
38                 return 0, err
39         }
40         n, err := rdr.Read(dst)
41         return int(n), err
42 }
43
44 func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
45         rdr, _, _, _, err := kvh.getOrHead("GET", opts.Locator, nil)
46         if err != nil {
47                 return 0, err
48         }
49         n, err := io.Copy(opts.WriteTo, rdr)
50         errClose := rdr.Close()
51         if err == nil {
52                 err = errClose
53         }
54         return int(n), err
55 }
56
57 func (kvh *keepViaHTTP) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
58         return kvh.httpBlockWrite(ctx, req)
59 }
60
61 func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
62         if !strings.Contains(locator, "+R") {
63                 // Either it has +A, or it's unsigned and we assume
64                 // it's a local locator on a site with signatures
65                 // disabled.
66                 return locator, nil
67         }
68         sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
69         _, _, url, hdr, err := kvh.KeepClient.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
70         if err != nil {
71                 return "", err
72         }
73         loc := hdr.Get("X-Keep-Locator")
74         if loc == "" {
75                 return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
76         }
77         return loc, nil
78 }