X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a04ea95e79c60ed2a54eaec5b5c2e235fe39ef9a..6e76e3322d66f609dabcd34c98cba34bd739e089:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index b81e070b5f..0e6fadcc35 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,7 +2,9 @@ package keepclient import ( + "bytes" "crypto/md5" + "crypto/tls" "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -12,63 +14,117 @@ import ( "log" "net/http" "regexp" + "strconv" "strings" "sync" - "sync/atomic" - "time" - "unsafe" ) // A Keep "block" is 64MB. const BLOCKSIZE = 64 * 1024 * 1024 -var BlockNotFound = errors.New("Block not found") +// Error interface with an error and boolean indicating whether the error is temporary +type Error interface { + error + Temporary() bool +} + +// multipleResponseError is of type Error +type multipleResponseError struct { + error + isTemp bool +} + +func (e *multipleResponseError) Temporary() bool { + return e.isTemp +} + +// BlockNotFound is a multipleResponseError where isTemp is false +var BlockNotFound = &ErrNotFound{multipleResponseError{ + error: errors.New("Block not found"), + isTemp: false, +}} + +// ErrNotFound is a multipleResponseError where isTemp can be true or false +type ErrNotFound struct { + multipleResponseError +} + var InsufficientReplicasError = errors.New("Could not write sufficient replicas") -var OversizeBlockError = errors.New("Block too big") +var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") +var InvalidLocatorError = errors.New("Invalid locator") + +// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server +var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found") + +// ErrIncompleteIndex is returned when the Index response does not end with a new empty line +var ErrIncompleteIndex = errors.New("Got incomplete index") const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" // Information about Arvados and Keep servers. type KeepClient struct { - Arvados *arvadosclient.ArvadosClient - Want_replicas int - Using_proxy bool - service_roots *map[string]string - lock sync.Mutex - Client *http.Client + Arvados *arvadosclient.ArvadosClient + Want_replicas int + Using_proxy bool + localRoots *map[string]string + writableLocalRoots *map[string]string + gatewayRoots *map[string]string + lock sync.RWMutex + Client *http.Client + Retries int + + // set to 1 if all writable services are of disk type, otherwise 0 + replicasPerService int +} + +// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers. +func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { + kc := New(arv) + return kc, kc.DiscoverKeepServers() } -// Create a new KeepClient. This will contact the API server to discover Keep -// servers. -func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) { - kc = KeepClient{ +// New func creates a new KeepClient struct. +// This func does not discover keep servers. It is the caller's responsibility. +func New(arv *arvadosclient.ArvadosClient) *KeepClient { + defaultReplicationLevel := 2 + value, err := arv.Discovery("defaultCollectionReplication") + if err == nil { + v, ok := value.(float64) + if ok && v > 0 { + defaultReplicationLevel = int(v) + } + } + + kc := &KeepClient{ Arvados: arv, - Want_replicas: 2, + Want_replicas: defaultReplicationLevel, Using_proxy: false, - Client: &http.Client{}, + Client: &http.Client{Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}}, + Retries: 2, } - err = (&kc).DiscoverKeepServers() - - return kc, err + return kc } -// Put a block given the block hash, a reader with the block data, and the -// expected length of that data. The desired number of replicas is given in -// KeepClient.Want_replicas. Returns the number of replicas that were written -// and if there was an error. Note this will return InsufficientReplias -// whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) { - +// Put a block given the block hash, a reader, and the number of bytes +// to read from the reader (which must be between 0 and BLOCKSIZE). +// +// Returns the locator for the written block, the number of replicas +// written, and an error. +// +// Returns an InsufficientReplicas error if 0 <= replicas < +// kc.Wants_replicas. +func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { // Buffer for reads from 'r' var bufsize int - if expectedLength > 0 { - if expectedLength > BLOCKSIZE { + if dataBytes > 0 { + if dataBytes > BLOCKSIZE { return "", 0, OversizeBlockError } - bufsize = int(expectedLength) + bufsize = int(dataBytes) } else { bufsize = BLOCKSIZE } @@ -76,215 +132,308 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (lo t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash}) defer t.Close() - return this.putReplicas(hash, t, expectedLength) + return kc.putReplicas(hash, t, dataBytes) } -// Put a block given the block hash and a byte buffer. The desired number of -// replicas is given in KeepClient.Want_replicas. Returns the number of -// replicas that were written and if there was an error. Note this will return -// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) { +// PutHB writes a block to Keep. The hash of the bytes is given in +// hash, and the data is given in buf. +// +// Return values are the same as for PutHR. +func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) { t := streamer.AsyncStreamFromSlice(buf) defer t.Close() - - return this.putReplicas(hash, t, int64(len(buf))) + return kc.putReplicas(hash, t, int64(len(buf))) } -// Put a block given a buffer. The hash will be computed. The desired number -// of replicas is given in KeepClient.Want_replicas. Returns the number of -// replicas that were written and if there was an error. Note this will return -// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) { +// PutB writes a block to Keep. It computes the hash itself. +// +// Return values are the same as for PutHR. +func (kc *KeepClient) PutB(buffer []byte) (string, int, error) { hash := fmt.Sprintf("%x", md5.Sum(buffer)) - return this.PutHB(hash, buffer) + return kc.PutHB(hash, buffer) } -// Put a block, given a Reader. This will read the entire reader into a buffer -// to compute the hash. The desired number of replicas is given in -// KeepClient.Want_replicas. Returns the number of replicas that were written -// and if there was an error. Note this will return InsufficientReplias -// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block -// hash and data size are available, PutHR() is more efficient. -func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) { +// PutR writes a block to Keep. It first reads all data from r into a buffer +// in order to compute the hash. +// +// Return values are the same as for PutHR. +// +// If the block hash and data size are known, PutHR is more efficient. +func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) { if buffer, err := ioutil.ReadAll(r); err != nil { return "", 0, err } else { - return this.PutB(buffer) + return kc.PutB(buffer) } } -// Get a block given a hash. Return a reader, the expected data length, the -// URL the block was fetched from, and if there was an error. If the block -// checksum does not match, the final Read() on the reader returned by this -// method will return a BadChecksum error instead of EOF. -func (this KeepClient) Get(hash string) (reader io.ReadCloser, - contentLength int64, url string, err error) { - return this.AuthorizedGet(hash, "", "") -} +func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) { + var errs []string -// Get a block given a hash, with additional authorization provided by -// signature and timestamp. Return a reader, the expected data length, the URL -// the block was fetched from, and if there was an error. If the block -// checksum does not match, the final Read() on the reader returned by this -// method will return a BadChecksum error instead of EOF. -func (this KeepClient) AuthorizedGet(hash string, - signature string, - timestamp string) (reader io.ReadCloser, - contentLength int64, url string, err error) { - - // Take the hash of locator and timestamp in order to identify this - // specific transaction in log statements. - requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8] - - // Calculate the ordering for asking servers - sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() - - for _, host := range sv { - var req *http.Request - var err error - var url string - if signature != "" { - url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, - signature, timestamp) - } else { - url = fmt.Sprintf("%s/%s", host, hash) - } - if req, err = http.NewRequest("GET", url, nil); err != nil { - continue - } + tries_remaining := 1 + kc.Retries + + serversToTry := kc.getSortedRoots(locator) + + numServers := len(serversToTry) + count404 := 0 - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + var retryList []string - log.Printf("[%v] Begin download %s", requestId, url) + for tries_remaining > 0 { + tries_remaining -= 1 + retryList = nil - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK { - statusCode := -1 - var respbody []byte - if resp != nil { - statusCode = resp.StatusCode - if resp.Body != nil { - respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + for _, host := range serversToTry { + url := host + "/" + locator + + req, err := http.NewRequest(method, url, nil) + if err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + continue + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + // Probably a network error, may be transient, + // can try again. + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + retryList = append(retryList, host) + } else if resp.StatusCode != http.StatusOK { + var respbody []byte + respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096}) + resp.Body.Close() + errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", + url, resp.StatusCode, bytes.TrimSpace(respbody))) + + if resp.StatusCode == 408 || + resp.StatusCode == 429 || + resp.StatusCode >= 500 { + // Timeout, too many requests, or other + // server side failure, transient + // error, can try again. + retryList = append(retryList, host) + } else if resp.StatusCode == 404 { + count404++ + } + } else { + // Success. + if method == "GET" { + return HashCheckingReader{ + Reader: resp.Body, + Hash: md5.New(), + Check: locator[0:32], + }, resp.ContentLength, url, nil + } else { + resp.Body.Close() + return nil, resp.ContentLength, url, nil } } - response := strings.TrimSpace(string(respbody)) - log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"", - requestId, url, statusCode, err, response) - continue - } - if resp.StatusCode == http.StatusOK { - log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode) - return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil } + serversToTry = retryList } + log.Printf("DEBUG: %s %s failed: %v", method, locator, errs) - return nil, 0, "", BlockNotFound + var err error + if count404 == numServers { + err = BlockNotFound + } else { + err = &ErrNotFound{multipleResponseError{ + error: fmt.Errorf("%s %s failed: %v", method, locator, errs), + isTemp: len(serversToTry) > 0, + }} + } + return nil, 0, "", err } -// Determine if a block with the given hash is available and readable, but does -// not return the block contents. -func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { - return this.AuthorizedAsk(hash, "", "") +// Get() retrieves a block, given a locator. Returns a reader, the +// expected data length, the URL the block is being fetched from, and +// an error. +// +// If the block checksum does not match, the final Read() on the +// reader returned by this method will return a BadChecksum error +// instead of EOF. +func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { + return kc.getOrHead("GET", locator) } -// Determine if a block with the given hash is available and readable with the -// given signature and timestamp, but does not return the block contents. -func (this KeepClient) AuthorizedAsk(hash string, signature string, - timestamp string) (contentLength int64, url string, err error) { - // Calculate the ordering for asking servers - sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() - - for _, host := range sv { - var req *http.Request - var err error - if signature != "" { - url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, - signature, timestamp) - } else { - url = fmt.Sprintf("%s/%s", host, hash) - } +// Ask() verifies that a block with the given hash is available and +// readable, according to at least one Keep service. Unlike Get, it +// does not retrieve the data or verify that the data content matches +// the hash specified by the locator. +// +// Returns the data size (content length) reported by the Keep service +// and the URI reporting the data size. +func (kc *KeepClient) Ask(locator string) (int64, string, error) { + _, size, url, err := kc.getOrHead("HEAD", locator) + return size, url, err +} - if req, err = http.NewRequest("HEAD", url, nil); err != nil { - continue - } +// GetIndex retrieves a list of blocks stored on the given server whose hashes +// begin with the given prefix. The returned reader will return an error (other +// than EOF) if the complete index cannot be retrieved. +// +// This is meant to be used only by system components and admin tools. +// It will return an error unless the client is using a "data manager token" +// recognized by the Keep services. +func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) { + url := kc.LocalRoots()[keepServiceUUID] + if url == "" { + return nil, ErrNoSuchKeepServer + } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + url += "/index" + if prefix != "" { + url += "/" + prefix + } - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { - continue - } + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } - if resp.StatusCode == http.StatusOK { - return resp.ContentLength, url, nil - } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + return nil, err } - return 0, "", BlockNotFound + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode) + } + var respBody []byte + respBody, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Got index; verify that it is complete + // The response should be "\n" if no locators matched the prefix + // Else, it should be a list of locators followed by a blank line + if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) { + return nil, ErrIncompleteIndex + } + + // Got complete index; strip the trailing newline and send + return bytes.NewReader(respBody[0 : len(respBody)-1]), nil } -// Atomically read the service_roots field. -func (this *KeepClient) ServiceRoots() map[string]string { - r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) - return *r +// LocalRoots() returns the map of local (i.e., disk and proxy) Keep +// services: uuid -> baseURI. +func (kc *KeepClient) LocalRoots() map[string]string { + kc.lock.RLock() + defer kc.lock.RUnlock() + return *kc.localRoots } -// Atomically update the service_roots field. Enables you to update -// service_roots without disrupting any GET or PUT operations that might -// already be in progress. -func (this *KeepClient) SetServiceRoots(new_roots map[string]string) { - roots := make(map[string]string) - for uuid, root := range new_roots { - roots[uuid] = root - } - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)), - unsafe.Pointer(&roots)) +// GatewayRoots() returns the map of Keep remote gateway services: +// uuid -> baseURI. +func (kc *KeepClient) GatewayRoots() map[string]string { + kc.lock.RLock() + defer kc.lock.RUnlock() + return *kc.gatewayRoots } -type Locator struct { - Hash string - Size int - Signature string - Timestamp string +// WritableLocalRoots() returns the map of writable local Keep services: +// uuid -> baseURI. +func (kc *KeepClient) WritableLocalRoots() map[string]string { + kc.lock.RLock() + defer kc.lock.RUnlock() + return *kc.writableLocalRoots } -func MakeLocator2(hash string, hints string) (locator Locator) { - locator.Hash = hash - if hints != "" { - signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$") - for _, hint := range strings.Split(hints, "+") { - if hint != "" { - if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match { - fmt.Sscanf(hint, "%d", &locator.Size) - } else if m := signature_pat.FindStringSubmatch(hint); m != nil { - locator.Signature = m[1] - locator.Timestamp = m[2] - } else if match, _ := regexp.MatchString("^[:upper:]", hint); match { - // Any unknown hint that starts with an uppercase letter is - // presumed to be valid and ignored, to permit forward compatibility. - } else { - // Unknown format; not a valid locator. - return Locator{"", 0, "", ""} - } +// SetServiceRoots updates the localRoots and gatewayRoots maps, +// without risk of disrupting operations that are already in progress. +// +// The KeepClient makes its own copy of the supplied maps, so the +// caller can reuse/modify them after SetServiceRoots returns, but +// they should not be modified by any other goroutine while +// SetServiceRoots is running. +func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) { + locals := make(map[string]string) + for uuid, root := range newLocals { + locals[uuid] = root + } + + writables := make(map[string]string) + for uuid, root := range newWritableLocals { + writables[uuid] = root + } + + gateways := make(map[string]string) + for uuid, root := range newGateways { + gateways[uuid] = root + } + + kc.lock.Lock() + defer kc.lock.Unlock() + kc.localRoots = &locals + kc.writableLocalRoots = &writables + kc.gatewayRoots = &gateways +} + +// getSortedRoots returns a list of base URIs of Keep services, in the +// order they should be attempted in order to retrieve content for the +// given locator. +func (kc *KeepClient) getSortedRoots(locator string) []string { + var found []string + for _, hint := range strings.Split(locator, "+") { + if len(hint) < 7 || hint[0:2] != "K@" { + // Not a service hint. + continue + } + if len(hint) == 7 { + // +K@abcde means fetch from proxy at + // keep.abcde.arvadosapi.com + found = append(found, "https://keep."+hint[2:]+".arvadosapi.com") + } else if len(hint) == 29 { + // +K@abcde-abcde-abcdeabcdeabcde means fetch + // from gateway with given uuid + if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok { + found = append(found, gwURI) } + // else this hint is no use to us; carry on. } } - return locator + // After trying all usable service hints, fall back to local roots. + found = append(found, NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()...) + return found } -func MakeLocator(path string) Locator { - pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$") - if err != nil { - log.Print("Don't like regexp", err) +type Locator struct { + Hash string + Size int // -1 if data size is not known + Hints []string // Including the size hint, if any +} + +func (loc *Locator) String() string { + s := loc.Hash + if len(loc.Hints) > 0 { + s = s + "+" + strings.Join(loc.Hints, "+") } + return s +} - sm := pathpattern.FindStringSubmatch(path) +var locatorMatcher = regexp.MustCompile("^([0-9a-f]{32})([+](.*))?$") + +func MakeLocator(path string) (*Locator, error) { + sm := locatorMatcher.FindStringSubmatch(path) if sm == nil { - log.Print("Failed match ", path) - return Locator{"", 0, "", ""} + return nil, InvalidLocatorError } - - return MakeLocator2(sm[1], sm[2]) + loc := Locator{Hash: sm[1], Size: -1} + if sm[2] != "" { + loc.Hints = strings.Split(sm[3], "+") + } else { + loc.Hints = []string{} + } + if len(loc.Hints) > 0 { + if size, err := strconv.Atoi(loc.Hints[0]); err == nil { + loc.Size = size + } + } + return &loc, nil }