package keepclient
import (
+ "bytes"
"crypto/md5"
"crypto/tls"
"errors"
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
+// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
+var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+
+// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
+var ErrIncompleteIndex = errors.New("Got incomplete index")
+
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
// Information about Arvados and Keep servers.
type KeepClient struct {
- Arvados *arvadosclient.ArvadosClient
- Want_replicas int
- Using_proxy bool
- localRoots *map[string]string
- gatewayRoots *map[string]string
- lock sync.RWMutex
- Client *http.Client
+ Arvados *arvadosclient.ArvadosClient
+ Want_replicas int
+ Using_proxy bool
+ localRoots *map[string]string
+ writableLocalRoots *map[string]string
+ gatewayRoots *map[string]string
+ lock sync.RWMutex
+ Client *http.Client
+
+ // set to 1 if all writable services are of disk type, otherwise 0
+ replicasPerService int
}
// Create a new KeepClient. This will contact the API server to discover Keep
url := host + "/" + locator
req, err := http.NewRequest("GET", url, nil)
if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
resp, err := kc.Client.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- if resp != nil {
- var respbody []byte
- if resp.Body != nil {
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
- }
- errs = append(errs, fmt.Sprintf("%s: %d %s",
- url, resp.StatusCode, strings.TrimSpace(string(respbody))))
- } else {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
- }
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ continue
+ } else if resp.StatusCode != http.StatusOK {
+ respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ resp.Body.Close()
+ errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+ url, resp.StatusCode, bytes.TrimSpace(respbody)))
continue
}
return HashCheckingReader{
return 0, "", BlockNotFound
}
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved.
+//
+// This is meant to be used only by system components and admin tools.
+// It will return an error unless the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+ url := kc.LocalRoots()[keepServiceUUID]
+ if url == "" {
+ return nil, ErrNoSuchKeepServer
+ }
+
+ url += "/index"
+ if prefix != "" {
+ url += "/" + prefix
+ }
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
+ }
+
+ var respBody []byte
+ respBody, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ // Got index; verify that it is complete
+ // The response should be "\n" if no locators matched the prefix
+ // Else, it should be a list of locators followed by a blank line
+ if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
+ return nil, ErrIncompleteIndex
+ }
+
+ // Got complete index; strip the trailing newline and send
+ return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
+}
+
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
return *kc.gatewayRoots
}
+// WritableLocalRoots() returns the map of writable local Keep services:
+// uuid -> baseURI.
+func (kc *KeepClient) WritableLocalRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.writableLocalRoots
+}
+
// SetServiceRoots updates the localRoots and gatewayRoots maps,
// without risk of disrupting operations that are already in progress.
//
// caller can reuse/modify them after SetServiceRoots returns, but
// they should not be modified by any other goroutine while
// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) {
locals := make(map[string]string)
for uuid, root := range newLocals {
locals[uuid] = root
}
+
+ writables := make(map[string]string)
+ for uuid, root := range newWritableLocals {
+ writables[uuid] = root
+ }
+
gateways := make(map[string]string)
for uuid, root := range newGateways {
gateways[uuid] = root
}
+
kc.lock.Lock()
defer kc.lock.Unlock()
kc.localRoots = &locals
+ kc.writableLocalRoots = &writables
kc.gatewayRoots = &gateways
}