X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/740d71d2b7a797bb2dd2e1e427c12e84c86b9ce6..2cf42c27a7e8b37e29462d0b695e24cb6f3ad5ce:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index e1c25c9e1d..23af470f78 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,20 +2,20 @@ package keepclient import ( - "git.curoverse.com/arvados.git/sdk/go/streamer" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "crypto/md5" "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" "log" "net/http" "regexp" - "sort" "strings" "sync" "sync/atomic" + "time" "unsafe" ) @@ -36,7 +36,7 @@ type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int Using_proxy bool - service_roots *[]string + service_roots *map[string]string lock sync.Mutex Client *http.Client } @@ -48,8 +48,8 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) Arvados: arv, Want_replicas: 2, Using_proxy: false, - Client: &http.Client{Transport: &http.Transport{}}} - + Client: &http.Client{}, + } err = (&kc).DiscoverKeepServers() return kc, err @@ -132,8 +132,12 @@ func (this KeepClient) AuthorizedGet(hash string, timestamp string) (reader io.ReadCloser, contentLength int64, url string, err error) { + // Take the hash of locator and timestamp in order to identify this + // specific transaction in log statements. + requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8] + // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) + sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() for _, host := range sv { var req *http.Request @@ -151,12 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string, req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + log.Printf("[%v] Begin download %s", requestId, url) + var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK { + respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + response := strings.TrimSpace(string(respbody)) + log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"", + requestId, url, resp.StatusCode, err, response) continue } if resp.StatusCode == http.StatusOK { + log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode) return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil } } @@ -175,7 +186,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er func (this KeepClient) AuthorizedAsk(hash string, signature string, timestamp string) (contentLength int64, url string, err error) { // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) + sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() for _, host := range sv { var req *http.Request @@ -208,20 +219,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string, } // Atomically read the service_roots field. -func (this *KeepClient) ServiceRoots() []string { - r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) +func (this *KeepClient) ServiceRoots() map[string]string { + r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) return *r } // Atomically update the service_roots field. Enables you to update // service_roots without disrupting any GET or PUT operations that might // already be in progress. -func (this *KeepClient) SetServiceRoots(svc []string) { - // Must be sorted for ShuffledServiceRoots() to produce consistent - // results. - roots := make([]string, len(svc)) - copy(roots, svc) - sort.Strings(roots) +func (this *KeepClient) SetServiceRoots(new_roots map[string]string) { + roots := make(map[string]string) + for uuid, root := range new_roots { + roots[uuid] = root + } atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)), unsafe.Pointer(&roots)) }