X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d30c3b218773355150aa1ee47fa64150dd1b4111..3280e2dc5fd16dca63c389b931658d4420faabaf:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index e1c25c9e1d..31cfb572e4 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,21 +2,21 @@ package keepclient import ( - "git.curoverse.com/arvados.git/sdk/go/streamer" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "crypto/md5" + "crypto/tls" "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" "log" "net/http" + "os" "regexp" - "sort" + "strconv" "strings" "sync" - "sync/atomic" - "unsafe" ) // A Keep "block" is 64MB. @@ -24,9 +24,10 @@ const BLOCKSIZE = 64 * 1024 * 1024 var BlockNotFound = errors.New("Block not found") var InsufficientReplicasError = errors.New("Could not write sufficient replicas") -var OversizeBlockError = errors.New("Block too big") +var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") +var InvalidLocatorError = errors.New("Invalid locator") const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" @@ -36,39 +37,43 @@ type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int Using_proxy bool - service_roots *[]string - lock sync.Mutex + localRoots *map[string]string + gatewayRoots *map[string]string + lock sync.RWMutex Client *http.Client } // Create a new KeepClient. This will contact the API server to discover Keep // servers. -func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) { - kc = KeepClient{ +func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { + var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") + insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + kc := &KeepClient{ Arvados: arv, Want_replicas: 2, Using_proxy: false, - Client: &http.Client{Transport: &http.Transport{}}} - - err = (&kc).DiscoverKeepServers() - - return kc, err + Client: &http.Client{Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + } + return kc, kc.DiscoverKeepServers() } -// Put a block given the block hash, a reader with the block data, and the -// expected length of that data. The desired number of replicas is given in -// KeepClient.Want_replicas. Returns the number of replicas that were written -// and if there was an error. Note this will return InsufficientReplias -// whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) { - +// Put a block given the block hash, a reader, and the number of bytes +// to read from the reader (which must be between 0 and BLOCKSIZE). +// +// Returns the locator for the written block, the number of replicas +// written, and an error. +// +// Returns an InsufficientReplicas error if 0 <= replicas < +// kc.Wants_replicas. +func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { // Buffer for reads from 'r' var bufsize int - if expectedLength > 0 { - if expectedLength > BLOCKSIZE { + if dataBytes > 0 { + if dataBytes > BLOCKSIZE { return "", 0, OversizeBlockError } - bufsize = int(expectedLength) + bufsize = int(dataBytes) } else { bufsize = BLOCKSIZE } @@ -76,198 +81,200 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (lo t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash}) defer t.Close() - return this.putReplicas(hash, t, expectedLength) + return kc.putReplicas(hash, t, dataBytes) } -// Put a block given the block hash and a byte buffer. The desired number of -// replicas is given in KeepClient.Want_replicas. Returns the number of -// replicas that were written and if there was an error. Note this will return -// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) { +// PutHB writes a block to Keep. The hash of the bytes is given in +// hash, and the data is given in buf. +// +// Return values are the same as for PutHR. +func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) { t := streamer.AsyncStreamFromSlice(buf) defer t.Close() - - return this.putReplicas(hash, t, int64(len(buf))) + return kc.putReplicas(hash, t, int64(len(buf))) } -// Put a block given a buffer. The hash will be computed. The desired number -// of replicas is given in KeepClient.Want_replicas. Returns the number of -// replicas that were written and if there was an error. Note this will return -// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. -func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) { +// PutB writes a block to Keep. It computes the hash itself. +// +// Return values are the same as for PutHR. +func (kc *KeepClient) PutB(buffer []byte) (string, int, error) { hash := fmt.Sprintf("%x", md5.Sum(buffer)) - return this.PutHB(hash, buffer) + return kc.PutHB(hash, buffer) } -// Put a block, given a Reader. This will read the entire reader into a buffer -// to compute the hash. The desired number of replicas is given in -// KeepClient.Want_replicas. Returns the number of replicas that were written -// and if there was an error. Note this will return InsufficientReplias -// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block -// hash and data size are available, PutHR() is more efficient. -func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) { +// PutR writes a block to Keep. It first reads all data from r into a buffer +// in order to compute the hash. +// +// Return values are the same as for PutHR. +// +// If the block hash and data size are known, PutHR is more efficient. +func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) { if buffer, err := ioutil.ReadAll(r); err != nil { return "", 0, err } else { - return this.PutB(buffer) + return kc.PutB(buffer) } } -// Get a block given a hash. Return a reader, the expected data length, the -// URL the block was fetched from, and if there was an error. If the block -// checksum does not match, the final Read() on the reader returned by this -// method will return a BadChecksum error instead of EOF. -func (this KeepClient) Get(hash string) (reader io.ReadCloser, - contentLength int64, url string, err error) { - return this.AuthorizedGet(hash, "", "") -} - -// Get a block given a hash, with additional authorization provided by -// signature and timestamp. Return a reader, the expected data length, the URL -// the block was fetched from, and if there was an error. If the block -// checksum does not match, the final Read() on the reader returned by this -// method will return a BadChecksum error instead of EOF. -func (this KeepClient) AuthorizedGet(hash string, - signature string, - timestamp string) (reader io.ReadCloser, - contentLength int64, url string, err error) { - - // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) - - for _, host := range sv { - var req *http.Request - var err error - var url string - if signature != "" { - url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, - signature, timestamp) - } else { - url = fmt.Sprintf("%s/%s", host, hash) - } - if req, err = http.NewRequest("GET", url, nil); err != nil { +// Get() retrieves a block, given a locator. Returns a reader, the +// expected data length, the URL the block is being fetched from, and +// an error. +// +// If the block checksum does not match, the final Read() on the +// reader returned by this method will return a BadChecksum error +// instead of EOF. +func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { + var errs []string + for _, host := range kc.getSortedRoots(locator) { + url := host + "/" + locator + req, err := http.NewRequest("GET", url, nil) + if err != nil { continue } - - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) - - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { + if resp != nil { + var respbody []byte + if resp.Body != nil { + respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + } + errs = append(errs, fmt.Sprintf("%s: %d %s", + url, resp.StatusCode, strings.TrimSpace(string(respbody)))) + } else { + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + } continue } - - if resp.StatusCode == http.StatusOK { - return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil - } + return HashCheckingReader{ + Reader: resp.Body, + Hash: md5.New(), + Check: locator[0:32], + }, resp.ContentLength, url, nil } - + log.Printf("DEBUG: GET %s failed: %v", locator, errs) return nil, 0, "", BlockNotFound } -// Determine if a block with the given hash is available and readable, but does -// not return the block contents. -func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { - return this.AuthorizedAsk(hash, "", "") -} - -// Determine if a block with the given hash is available and readable with the -// given signature and timestamp, but does not return the block contents. -func (this KeepClient) AuthorizedAsk(hash string, signature string, - timestamp string) (contentLength int64, url string, err error) { - // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) - - for _, host := range sv { - var req *http.Request - var err error - if signature != "" { - url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, - signature, timestamp) - } else { - url = fmt.Sprintf("%s/%s", host, hash) - } - - if req, err = http.NewRequest("HEAD", url, nil); err != nil { - continue - } - - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) - - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { +// Ask() verifies that a block with the given hash is available and +// readable, according to at least one Keep service. Unlike Get, it +// does not retrieve the data or verify that the data content matches +// the hash specified by the locator. +// +// Returns the data size (content length) reported by the Keep service +// and the URI reporting the data size. +func (kc *KeepClient) Ask(locator string) (int64, string, error) { + for _, host := range kc.getSortedRoots(locator) { + url := host + "/" + locator + req, err := http.NewRequest("HEAD", url, nil) + if err != nil { continue } - - if resp.StatusCode == http.StatusOK { + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK { return resp.ContentLength, url, nil } } - return 0, "", BlockNotFound - } -// Atomically read the service_roots field. -func (this *KeepClient) ServiceRoots() []string { - r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) - return *r +// LocalRoots() returns the map of local (i.e., disk and proxy) Keep +// services: uuid -> baseURI. +func (kc *KeepClient) LocalRoots() map[string]string { + kc.lock.RLock() + defer kc.lock.RUnlock() + return *kc.localRoots } -// Atomically update the service_roots field. Enables you to update -// service_roots without disrupting any GET or PUT operations that might -// already be in progress. -func (this *KeepClient) SetServiceRoots(svc []string) { - // Must be sorted for ShuffledServiceRoots() to produce consistent - // results. - roots := make([]string, len(svc)) - copy(roots, svc) - sort.Strings(roots) - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)), - unsafe.Pointer(&roots)) +// GatewayRoots() returns the map of Keep remote gateway services: +// uuid -> baseURI. +func (kc *KeepClient) GatewayRoots() map[string]string { + kc.lock.RLock() + defer kc.lock.RUnlock() + return *kc.gatewayRoots } -type Locator struct { - Hash string - Size int - Signature string - Timestamp string +// SetServiceRoots updates the localRoots and gatewayRoots maps, +// without risk of disrupting operations that are already in progress. +// +// The KeepClient makes its own copy of the supplied maps, so the +// caller can reuse/modify them after SetServiceRoots returns, but +// they should not be modified by any other goroutine while +// SetServiceRoots is running. +func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) { + locals := make(map[string]string) + for uuid, root := range newLocals { + locals[uuid] = root + } + gateways := make(map[string]string) + for uuid, root := range newGateways { + gateways[uuid] = root + } + kc.lock.Lock() + defer kc.lock.Unlock() + kc.localRoots = &locals + kc.gatewayRoots = &gateways } -func MakeLocator2(hash string, hints string) (locator Locator) { - locator.Hash = hash - if hints != "" { - signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$") - for _, hint := range strings.Split(hints, "+") { - if hint != "" { - if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match { - fmt.Sscanf(hint, "%d", &locator.Size) - } else if m := signature_pat.FindStringSubmatch(hint); m != nil { - locator.Signature = m[1] - locator.Timestamp = m[2] - } else if match, _ := regexp.MatchString("^[:upper:]", hint); match { - // Any unknown hint that starts with an uppercase letter is - // presumed to be valid and ignored, to permit forward compatibility. - } else { - // Unknown format; not a valid locator. - return Locator{"", 0, "", ""} - } +// getSortedRoots returns a list of base URIs of Keep services, in the +// order they should be attempted in order to retrieve content for the +// given locator. +func (kc *KeepClient) getSortedRoots(locator string) []string { + var found []string + for _, hint := range strings.Split(locator, "+") { + if len(hint) < 7 || hint[0:2] != "K@" { + // Not a service hint. + continue + } + if len(hint) == 7 { + // +K@abcde means fetch from proxy at + // keep.abcde.arvadosapi.com + found = append(found, "https://keep."+hint[2:]+".arvadosapi.com") + } else if len(hint) == 29 { + // +K@abcde-abcde-abcdeabcdeabcde means fetch + // from gateway with given uuid + if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok { + found = append(found, gwURI) } + // else this hint is no use to us; carry on. } } - return locator + // After trying all usable service hints, fall back to local roots. + found = append(found, NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()...) + return found +} + +type Locator struct { + Hash string + Size int // -1 if data size is not known + Hints []string // Including the size hint, if any } -func MakeLocator(path string) Locator { - pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$") - if err != nil { - log.Print("Don't like regexp", err) +func (loc *Locator) String() string { + s := loc.Hash + if len(loc.Hints) > 0 { + s = s + "+" + strings.Join(loc.Hints, "+") } + return s +} - sm := pathpattern.FindStringSubmatch(path) +var locatorMatcher = regexp.MustCompile("^([0-9a-f]{32})([+](.*))?$") + +func MakeLocator(path string) (*Locator, error) { + sm := locatorMatcher.FindStringSubmatch(path) if sm == nil { - log.Print("Failed match ", path) - return Locator{"", 0, "", ""} + return nil, InvalidLocatorError } - - return MakeLocator2(sm[1], sm[2]) + loc := Locator{Hash: sm[1], Size: -1} + if sm[2] != "" { + loc.Hints = strings.Split(sm[3], "+") + } else { + loc.Hints = []string{} + } + if len(loc.Hints) > 0 { + if size, err := strconv.Atoi(loc.Hints[0]); err == nil { + loc.Size = size + } + } + return &loc, nil }