Go Keep client correctly closes response body on client requests, should fix
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "crypto/md5"
7         "crypto/tls"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "sort"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "unsafe"
21 )
22
23 // A Keep "block" is 64MB.
24 const BLOCKSIZE = 64 * 1024 * 1024
25
26 var BlockNotFound = errors.New("Block not found")
27 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
28 var OversizeBlockError = errors.New("Block too big")
29 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
30 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
31
32 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
33 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
34
35 // Information about Arvados and Keep servers.
36 type KeepClient struct {
37         ApiServer     string
38         ApiToken      string
39         ApiInsecure   bool
40         Want_replicas int
41         Client        *http.Client
42         Using_proxy   bool
43         External      bool
44         service_roots *[]string
45         lock          sync.Mutex
46 }
47
48 // Create a new KeepClient, initialized with standard Arvados environment
49 // variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
50 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
51 // Keep servers.
52 func MakeKeepClient() (kc KeepClient, err error) {
53         insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
54         external := (os.Getenv("ARVADOS_EXTERNAL_CLIENT") == "true")
55
56         kc = KeepClient{
57                 ApiServer:     os.Getenv("ARVADOS_API_HOST"),
58                 ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
59                 ApiInsecure:   insecure,
60                 Want_replicas: 2,
61                 Client: &http.Client{Transport: &http.Transport{
62                         TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
63                 Using_proxy: false,
64                 External:    external}
65
66         if os.Getenv("ARVADOS_API_HOST") == "" {
67                 return kc, MissingArvadosApiHost
68         }
69         if os.Getenv("ARVADOS_API_TOKEN") == "" {
70                 return kc, MissingArvadosApiToken
71         }
72
73         err = (&kc).DiscoverKeepServers()
74
75         return kc, err
76 }
77
78 // Put a block given the block hash, a reader with the block data, and the
79 // expected length of that data.  The desired number of replicas is given in
80 // KeepClient.Want_replicas.  Returns the number of replicas that were written
81 // and if there was an error.  Note this will return InsufficientReplias
82 // whenever 0 <= replicas < this.Wants_replicas.
83 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
84
85         // Buffer for reads from 'r'
86         var bufsize int
87         if expectedLength > 0 {
88                 if expectedLength > BLOCKSIZE {
89                         return "", 0, OversizeBlockError
90                 }
91                 bufsize = int(expectedLength)
92         } else {
93                 bufsize = BLOCKSIZE
94         }
95
96         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
97         defer t.Close()
98
99         return this.putReplicas(hash, t, expectedLength)
100 }
101
102 // Put a block given the block hash and a byte buffer.  The desired number of
103 // replicas is given in KeepClient.Want_replicas.  Returns the number of
104 // replicas that were written and if there was an error.  Note this will return
105 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
106 func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
107         t := streamer.AsyncStreamFromSlice(buf)
108         defer t.Close()
109
110         return this.putReplicas(hash, t, int64(len(buf)))
111 }
112
113 // Put a block given a buffer.  The hash will be computed.  The desired number
114 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
115 // replicas that were written and if there was an error.  Note this will return
116 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
117 func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
118         hash := fmt.Sprintf("%x", md5.Sum(buffer))
119         return this.PutHB(hash, buffer)
120 }
121
122 // Put a block, given a Reader.  This will read the entire reader into a buffer
123 // to compute the hash.  The desired number of replicas is given in
124 // KeepClient.Want_replicas.  Returns the number of replicas that were written
125 // and if there was an error.  Note this will return InsufficientReplias
126 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
127 // hash and data size are available, PutHR() is more efficient.
128 func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
129         if buffer, err := ioutil.ReadAll(r); err != nil {
130                 return "", 0, err
131         } else {
132                 return this.PutB(buffer)
133         }
134 }
135
136 // Get a block given a hash.  Return a reader, the expected data length, the
137 // URL the block was fetched from, and if there was an error.  If the block
138 // checksum does not match, the final Read() on the reader returned by this
139 // method will return a BadChecksum error instead of EOF.
140 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
141         contentLength int64, url string, err error) {
142         return this.AuthorizedGet(hash, "", "")
143 }
144
145 // Get a block given a hash, with additional authorization provided by
146 // signature and timestamp.  Return a reader, the expected data length, the URL
147 // the block was fetched from, and if there was an error.  If the block
148 // checksum does not match, the final Read() on the reader returned by this
149 // method will return a BadChecksum error instead of EOF.
150 func (this KeepClient) AuthorizedGet(hash string,
151         signature string,
152         timestamp string) (reader io.ReadCloser,
153         contentLength int64, url string, err error) {
154
155         // Calculate the ordering for asking servers
156         sv := this.shuffledServiceRoots(hash)
157
158         for _, host := range sv {
159                 var req *http.Request
160                 var err error
161                 var url string
162                 if signature != "" {
163                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
164                                 signature, timestamp)
165                 } else {
166                         url = fmt.Sprintf("%s/%s", host, hash)
167                 }
168                 if req, err = http.NewRequest("GET", url, nil); err != nil {
169                         continue
170                 }
171
172                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
173
174                 var resp *http.Response
175                 if resp, err = this.Client.Do(req); err != nil {
176                         continue
177                 }
178
179                 if resp.StatusCode == http.StatusOK {
180                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
181                 }
182         }
183
184         return nil, 0, "", BlockNotFound
185 }
186
187 // Determine if a block with the given hash is available and readable, but does
188 // not return the block contents.
189 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
190         return this.AuthorizedAsk(hash, "", "")
191 }
192
193 // Determine if a block with the given hash is available and readable with the
194 // given signature and timestamp, but does not return the block contents.
195 func (this KeepClient) AuthorizedAsk(hash string, signature string,
196         timestamp string) (contentLength int64, url string, err error) {
197         // Calculate the ordering for asking servers
198         sv := this.shuffledServiceRoots(hash)
199
200         for _, host := range sv {
201                 var req *http.Request
202                 var err error
203                 if signature != "" {
204                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
205                                 signature, timestamp)
206                 } else {
207                         url = fmt.Sprintf("%s/%s", host, hash)
208                 }
209
210                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
211                         continue
212                 }
213
214                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
215
216                 var resp *http.Response
217                 if resp, err = this.Client.Do(req); err != nil {
218                         continue
219                 }
220
221                 if resp.StatusCode == http.StatusOK {
222                         return resp.ContentLength, url, nil
223                 }
224         }
225
226         return 0, "", BlockNotFound
227
228 }
229
230 // Atomically read the service_roots field.
231 func (this *KeepClient) ServiceRoots() []string {
232         r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
233         return *r
234 }
235
236 // Atomically update the service_roots field.  Enables you to update
237 // service_roots without disrupting any GET or PUT operations that might
238 // already be in progress.
239 func (this *KeepClient) SetServiceRoots(svc []string) {
240         // Must be sorted for ShuffledServiceRoots() to produce consistent
241         // results.
242         roots := make([]string, len(svc))
243         copy(roots, svc)
244         sort.Strings(roots)
245         atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
246                 unsafe.Pointer(&roots))
247 }
248
249 type Locator struct {
250         Hash      string
251         Size      int
252         Signature string
253         Timestamp string
254 }
255
256 func MakeLocator2(hash string, hints string) (locator Locator) {
257         locator.Hash = hash
258         if hints != "" {
259                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
260                 for _, hint := range strings.Split(hints, "+") {
261                         if hint != "" {
262                                 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
263                                         fmt.Sscanf(hint, "%d", &locator.Size)
264                                 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
265                                         locator.Signature = m[1]
266                                         locator.Timestamp = m[2]
267                                 } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
268                                         // Any unknown hint that starts with an uppercase letter is
269                                         // presumed to be valid and ignored, to permit forward compatibility.
270                                 } else {
271                                         // Unknown format; not a valid locator.
272                                         return Locator{"", 0, "", ""}
273                                 }
274                         }
275                 }
276         }
277         return locator
278 }
279
280 func MakeLocator(path string) Locator {
281         pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
282         if err != nil {
283                 log.Print("Don't like regexp", err)
284         }
285
286         sm := pathpattern.FindStringSubmatch(path)
287         if sm == nil {
288                 log.Print("Failed match ", path)
289                 return Locator{"", 0, "", ""}
290         }
291
292         return MakeLocator2(sm[1], sm[2])
293 }