2798: Adds client side support for Keep proxy X-Keep-Desired-Replicas and
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "crypto/md5"
7         "crypto/tls"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14 )
15
16 // A Keep "block" is 64MB.
17 const BLOCKSIZE = 64 * 1024 * 1024
18
19 var BlockNotFound = errors.New("Block not found")
20 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
21 var OversizeBlockError = errors.New("Block too big")
22
23 // Information about Arvados and Keep servers.
24 type KeepClient struct {
25         ApiServer     string
26         ApiToken      string
27         ApiInsecure   bool
28         Service_roots []string
29         Want_replicas int
30         Client        *http.Client
31         Using_proxy   bool
32 }
33
34 // Create a new KeepClient, initialized with standard Arvados environment
35 // variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
36 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
37 // Keep servers.
38 func MakeKeepClient() (kc KeepClient, err error) {
39         insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
40
41         kc = KeepClient{
42                 ApiServer:     os.Getenv("ARVADOS_API_HOST"),
43                 ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
44                 ApiInsecure:   insecure,
45                 Want_replicas: 2,
46                 Client: &http.Client{Transport: &http.Transport{
47                         TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
48                 Using_proxy: false}
49
50         err = (&kc).discoverKeepServers()
51
52         return kc, err
53 }
54
55 // Put a block given the block hash, a reader with the block data, and the
56 // expected length of that data.  The desired number of replicas is given in
57 // KeepClient.Want_replicas.  Returns the number of replicas that were written
58 // and if there was an error.  Note this will return InsufficientReplias
59 // whenever 0 <= replicas < this.Wants_replicas.
60 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
61
62         // Buffer for reads from 'r'
63         var bufsize int
64         if expectedLength > 0 {
65                 if expectedLength > BLOCKSIZE {
66                         return 0, OversizeBlockError
67                 }
68                 bufsize = int(expectedLength)
69         } else {
70                 bufsize = BLOCKSIZE
71         }
72
73         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
74         defer t.Close()
75
76         return this.putReplicas(hash, t, expectedLength)
77 }
78
79 // Put a block given the block hash and a byte buffer.  The desired number of
80 // replicas is given in KeepClient.Want_replicas.  Returns the number of
81 // replicas that were written and if there was an error.  Note this will return
82 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
83 func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
84         t := streamer.AsyncStreamFromSlice(buf)
85         defer t.Close()
86
87         return this.putReplicas(hash, t, int64(len(buf)))
88 }
89
90 // Put a block given a buffer.  The hash will be computed.  The desired number
91 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
92 // replicas that were written and if there was an error.  Note this will return
93 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
94 func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
95         hash = fmt.Sprintf("%x", md5.Sum(buffer))
96         replicas, err = this.PutHB(hash, buffer)
97         return hash, replicas, err
98 }
99
100 // Put a block, given a Reader.  This will read the entire reader into a buffer
101 // to computed the hash.  The desired number of replicas is given in
102 // KeepClient.Want_replicas.  Returns the number of replicas that were written
103 // and if there was an error.  Note this will return InsufficientReplias
104 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
105 // hash and data size are available, PutHR() is more efficient.
106 func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
107         if buffer, err := ioutil.ReadAll(r); err != nil {
108                 return "", 0, err
109         } else {
110                 return this.PutB(buffer)
111         }
112 }
113
114 // Get a block given a hash.  Return a reader, the expected data length, the
115 // URL the block was fetched from, and if there was an error.  If the block
116 // checksum does not match, the final Read() on the reader returned by this
117 // method will return a BadChecksum error instead of EOF.
118 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
119         contentLength int64, url string, err error) {
120         return this.AuthorizedGet(hash, "", "")
121 }
122
123 // Get a block given a hash, with additional authorization provided by
124 // signature and timestamp.  Return a reader, the expected data length, the URL
125 // the block was fetched from, and if there was an error.  If the block
126 // checksum does not match, the final Read() on the reader returned by this
127 // method will return a BadChecksum error instead of EOF.
128 func (this KeepClient) AuthorizedGet(hash string,
129         signature string,
130         timestamp string) (reader io.ReadCloser,
131         contentLength int64, url string, err error) {
132
133         // Calculate the ordering for asking servers
134         sv := this.shuffledServiceRoots(hash)
135
136         for _, host := range sv {
137                 var req *http.Request
138                 var err error
139                 var url string
140                 if signature != "" {
141                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
142                                 signature, timestamp)
143                 } else {
144                         url = fmt.Sprintf("%s/%s", host, hash)
145                 }
146                 if req, err = http.NewRequest("GET", url, nil); err != nil {
147                         continue
148                 }
149
150                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
151
152                 var resp *http.Response
153                 if resp, err = this.Client.Do(req); err != nil {
154                         continue
155                 }
156
157                 if resp.StatusCode == http.StatusOK {
158                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
159                 }
160         }
161
162         return nil, 0, "", BlockNotFound
163 }
164
165 // Determine if a block with the given hash is available and readable, but does
166 // not return the block contents.
167 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
168         return this.AuthorizedAsk(hash, "", "")
169 }
170
171 // Determine if a block with the given hash is available and readable with the
172 // given signature and timestamp, but does not return the block contents.
173 func (this KeepClient) AuthorizedAsk(hash string, signature string,
174         timestamp string) (contentLength int64, url string, err error) {
175         // Calculate the ordering for asking servers
176         sv := this.shuffledServiceRoots(hash)
177
178         for _, host := range sv {
179                 var req *http.Request
180                 var err error
181                 if signature != "" {
182                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
183                                 signature, timestamp)
184                 } else {
185                         url = fmt.Sprintf("%s/%s", host, hash)
186                 }
187
188                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
189                         continue
190                 }
191
192                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
193
194                 var resp *http.Response
195                 if resp, err = this.Client.Do(req); err != nil {
196                         continue
197                 }
198
199                 if resp.StatusCode == http.StatusOK {
200                         return resp.ContentLength, url, nil
201                 }
202         }
203
204         return 0, "", BlockNotFound
205
206 }