Merge branch '8345-revert-llfuse-to-0.41.1'
[arvados.git] / sdk / go / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/streamer"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18 )
19
20 // A Keep "block" is 64MB.
21 const BLOCKSIZE = 64 * 1024 * 1024
22
23 // Error interface with an error and boolean indicating whether the error is temporary
24 type Error interface {
25         error
26         Temporary() bool
27 }
28
29 // multipleResponseError is of type Error
30 type multipleResponseError struct {
31         error
32         isTemp bool
33 }
34
35 func (e *multipleResponseError) Temporary() bool {
36         return e.isTemp
37 }
38
39 // BlockNotFound is a multipleResponseError where isTemp is false
40 var BlockNotFound = &ErrNotFound{multipleResponseError{
41         error:  errors.New("Block not found"),
42         isTemp: false,
43 }}
44
45 // ErrNotFound is a multipleResponseError where isTemp can be true or false
46 type ErrNotFound struct {
47         multipleResponseError
48 }
49
50 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
51 var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
52 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
53 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
54 var InvalidLocatorError = errors.New("Invalid locator")
55
56 // ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
57 var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
58
59 // ErrIncompleteIndex is returned when the Index response does not end with a new empty line
60 var ErrIncompleteIndex = errors.New("Got incomplete index")
61
62 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
63 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
64
65 // Information about Arvados and Keep servers.
66 type KeepClient struct {
67         Arvados            *arvadosclient.ArvadosClient
68         Want_replicas      int
69         localRoots         *map[string]string
70         writableLocalRoots *map[string]string
71         gatewayRoots       *map[string]string
72         lock               sync.RWMutex
73         Client             *http.Client
74         Retries            int
75
76         // set to 1 if all writable services are of disk type, otherwise 0
77         replicasPerService int
78
79         // Any non-disk typed services found in the list of keepservers?
80         foundNonDiskSvc bool
81 }
82
83 // MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
84 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
85         kc := New(arv)
86         return kc, kc.DiscoverKeepServers()
87 }
88
89 // New func creates a new KeepClient struct.
90 // This func does not discover keep servers. It is the caller's responsibility.
91 func New(arv *arvadosclient.ArvadosClient) *KeepClient {
92         defaultReplicationLevel := 2
93         value, err := arv.Discovery("defaultCollectionReplication")
94         if err == nil {
95                 v, ok := value.(float64)
96                 if ok && v > 0 {
97                         defaultReplicationLevel = int(v)
98                 }
99         }
100
101         kc := &KeepClient{
102                 Arvados:       arv,
103                 Want_replicas: defaultReplicationLevel,
104                 Client: &http.Client{Transport: &http.Transport{
105                         TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
106                 Retries: 2,
107         }
108         return kc
109 }
110
111 // Put a block given the block hash, a reader, and the number of bytes
112 // to read from the reader (which must be between 0 and BLOCKSIZE).
113 //
114 // Returns the locator for the written block, the number of replicas
115 // written, and an error.
116 //
117 // Returns an InsufficientReplicas error if 0 <= replicas <
118 // kc.Wants_replicas.
119 func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
120         // Buffer for reads from 'r'
121         var bufsize int
122         if dataBytes > 0 {
123                 if dataBytes > BLOCKSIZE {
124                         return "", 0, OversizeBlockError
125                 }
126                 bufsize = int(dataBytes)
127         } else {
128                 bufsize = BLOCKSIZE
129         }
130
131         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
132         defer t.Close()
133
134         return kc.putReplicas(hash, t, dataBytes)
135 }
136
137 // PutHB writes a block to Keep. The hash of the bytes is given in
138 // hash, and the data is given in buf.
139 //
140 // Return values are the same as for PutHR.
141 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
142         t := streamer.AsyncStreamFromSlice(buf)
143         defer t.Close()
144         return kc.putReplicas(hash, t, int64(len(buf)))
145 }
146
147 // PutB writes a block to Keep. It computes the hash itself.
148 //
149 // Return values are the same as for PutHR.
150 func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
151         hash := fmt.Sprintf("%x", md5.Sum(buffer))
152         return kc.PutHB(hash, buffer)
153 }
154
155 // PutR writes a block to Keep. It first reads all data from r into a buffer
156 // in order to compute the hash.
157 //
158 // Return values are the same as for PutHR.
159 //
160 // If the block hash and data size are known, PutHR is more efficient.
161 func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
162         if buffer, err := ioutil.ReadAll(r); err != nil {
163                 return "", 0, err
164         } else {
165                 return kc.PutB(buffer)
166         }
167 }
168
169 func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
170         if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") {
171                 return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
172         }
173
174         var errs []string
175
176         tries_remaining := 1 + kc.Retries
177
178         serversToTry := kc.getSortedRoots(locator)
179
180         numServers := len(serversToTry)
181         count404 := 0
182
183         var retryList []string
184
185         for tries_remaining > 0 {
186                 tries_remaining -= 1
187                 retryList = nil
188
189                 for _, host := range serversToTry {
190                         url := host + "/" + locator
191
192                         req, err := http.NewRequest(method, url, nil)
193                         if err != nil {
194                                 errs = append(errs, fmt.Sprintf("%s: %v", url, err))
195                                 continue
196                         }
197                         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
198                         resp, err := kc.Client.Do(req)
199                         if err != nil {
200                                 // Probably a network error, may be transient,
201                                 // can try again.
202                                 errs = append(errs, fmt.Sprintf("%s: %v", url, err))
203                                 retryList = append(retryList, host)
204                         } else if resp.StatusCode != http.StatusOK {
205                                 var respbody []byte
206                                 respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
207                                 resp.Body.Close()
208                                 errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
209                                         url, resp.StatusCode, bytes.TrimSpace(respbody)))
210
211                                 if resp.StatusCode == 408 ||
212                                         resp.StatusCode == 429 ||
213                                         resp.StatusCode >= 500 {
214                                         // Timeout, too many requests, or other
215                                         // server side failure, transient
216                                         // error, can try again.
217                                         retryList = append(retryList, host)
218                                 } else if resp.StatusCode == 404 {
219                                         count404++
220                                 }
221                         } else {
222                                 // Success.
223                                 if method == "GET" {
224                                         return HashCheckingReader{
225                                                 Reader: resp.Body,
226                                                 Hash:   md5.New(),
227                                                 Check:  locator[0:32],
228                                         }, resp.ContentLength, url, nil
229                                 } else {
230                                         resp.Body.Close()
231                                         return nil, resp.ContentLength, url, nil
232                                 }
233                         }
234
235                 }
236                 serversToTry = retryList
237         }
238         DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
239
240         var err error
241         if count404 == numServers {
242                 err = BlockNotFound
243         } else {
244                 err = &ErrNotFound{multipleResponseError{
245                         error:  fmt.Errorf("%s %s failed: %v", method, locator, errs),
246                         isTemp: len(serversToTry) > 0,
247                 }}
248         }
249         return nil, 0, "", err
250 }
251
252 // Get() retrieves a block, given a locator. Returns a reader, the
253 // expected data length, the URL the block is being fetched from, and
254 // an error.
255 //
256 // If the block checksum does not match, the final Read() on the
257 // reader returned by this method will return a BadChecksum error
258 // instead of EOF.
259 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
260         return kc.getOrHead("GET", locator)
261 }
262
263 // Ask() verifies that a block with the given hash is available and
264 // readable, according to at least one Keep service. Unlike Get, it
265 // does not retrieve the data or verify that the data content matches
266 // the hash specified by the locator.
267 //
268 // Returns the data size (content length) reported by the Keep service
269 // and the URI reporting the data size.
270 func (kc *KeepClient) Ask(locator string) (int64, string, error) {
271         _, size, url, err := kc.getOrHead("HEAD", locator)
272         return size, url, err
273 }
274
275 // GetIndex retrieves a list of blocks stored on the given server whose hashes
276 // begin with the given prefix. The returned reader will return an error (other
277 // than EOF) if the complete index cannot be retrieved.
278 //
279 // This is meant to be used only by system components and admin tools.
280 // It will return an error unless the client is using a "data manager token"
281 // recognized by the Keep services.
282 func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
283         url := kc.LocalRoots()[keepServiceUUID]
284         if url == "" {
285                 return nil, ErrNoSuchKeepServer
286         }
287
288         url += "/index"
289         if prefix != "" {
290                 url += "/" + prefix
291         }
292
293         req, err := http.NewRequest("GET", url, nil)
294         if err != nil {
295                 return nil, err
296         }
297
298         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
299         resp, err := kc.Client.Do(req)
300         if err != nil {
301                 return nil, err
302         }
303
304         defer resp.Body.Close()
305
306         if resp.StatusCode != http.StatusOK {
307                 return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
308         }
309
310         var respBody []byte
311         respBody, err = ioutil.ReadAll(resp.Body)
312         if err != nil {
313                 return nil, err
314         }
315
316         // Got index; verify that it is complete
317         // The response should be "\n" if no locators matched the prefix
318         // Else, it should be a list of locators followed by a blank line
319         if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
320                 return nil, ErrIncompleteIndex
321         }
322
323         // Got complete index; strip the trailing newline and send
324         return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
325 }
326
327 // LocalRoots() returns the map of local (i.e., disk and proxy) Keep
328 // services: uuid -> baseURI.
329 func (kc *KeepClient) LocalRoots() map[string]string {
330         kc.lock.RLock()
331         defer kc.lock.RUnlock()
332         return *kc.localRoots
333 }
334
335 // GatewayRoots() returns the map of Keep remote gateway services:
336 // uuid -> baseURI.
337 func (kc *KeepClient) GatewayRoots() map[string]string {
338         kc.lock.RLock()
339         defer kc.lock.RUnlock()
340         return *kc.gatewayRoots
341 }
342
343 // WritableLocalRoots() returns the map of writable local Keep services:
344 // uuid -> baseURI.
345 func (kc *KeepClient) WritableLocalRoots() map[string]string {
346         kc.lock.RLock()
347         defer kc.lock.RUnlock()
348         return *kc.writableLocalRoots
349 }
350
351 // SetServiceRoots updates the localRoots and gatewayRoots maps,
352 // without risk of disrupting operations that are already in progress.
353 //
354 // The KeepClient makes its own copy of the supplied maps, so the
355 // caller can reuse/modify them after SetServiceRoots returns, but
356 // they should not be modified by any other goroutine while
357 // SetServiceRoots is running.
358 func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
359         locals := make(map[string]string)
360         for uuid, root := range newLocals {
361                 locals[uuid] = root
362         }
363
364         writables := make(map[string]string)
365         for uuid, root := range newWritableLocals {
366                 writables[uuid] = root
367         }
368
369         gateways := make(map[string]string)
370         for uuid, root := range newGateways {
371                 gateways[uuid] = root
372         }
373
374         kc.lock.Lock()
375         defer kc.lock.Unlock()
376         kc.localRoots = &locals
377         kc.writableLocalRoots = &writables
378         kc.gatewayRoots = &gateways
379 }
380
381 // getSortedRoots returns a list of base URIs of Keep services, in the
382 // order they should be attempted in order to retrieve content for the
383 // given locator.
384 func (kc *KeepClient) getSortedRoots(locator string) []string {
385         var found []string
386         for _, hint := range strings.Split(locator, "+") {
387                 if len(hint) < 7 || hint[0:2] != "K@" {
388                         // Not a service hint.
389                         continue
390                 }
391                 if len(hint) == 7 {
392                         // +K@abcde means fetch from proxy at
393                         // keep.abcde.arvadosapi.com
394                         found = append(found, "https://keep."+hint[2:]+".arvadosapi.com")
395                 } else if len(hint) == 29 {
396                         // +K@abcde-abcde-abcdeabcdeabcde means fetch
397                         // from gateway with given uuid
398                         if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok {
399                                 found = append(found, gwURI)
400                         }
401                         // else this hint is no use to us; carry on.
402                 }
403         }
404         // After trying all usable service hints, fall back to local roots.
405         found = append(found, NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()...)
406         return found
407 }
408
409 type Locator struct {
410         Hash  string
411         Size  int      // -1 if data size is not known
412         Hints []string // Including the size hint, if any
413 }
414
415 func (loc *Locator) String() string {
416         s := loc.Hash
417         if len(loc.Hints) > 0 {
418                 s = s + "+" + strings.Join(loc.Hints, "+")
419         }
420         return s
421 }
422
423 var locatorMatcher = regexp.MustCompile("^([0-9a-f]{32})([+](.*))?$")
424
425 func MakeLocator(path string) (*Locator, error) {
426         sm := locatorMatcher.FindStringSubmatch(path)
427         if sm == nil {
428                 return nil, InvalidLocatorError
429         }
430         loc := Locator{Hash: sm[1], Size: -1}
431         if sm[2] != "" {
432                 loc.Hints = strings.Split(sm[3], "+")
433         } else {
434                 loc.Hints = []string{}
435         }
436         if len(loc.Hints) > 0 {
437                 if size, err := strconv.Atoi(loc.Hints[0]); err == nil {
438                         loc.Size = size
439                 }
440         }
441         return &loc, nil
442 }