X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/740d71d2b7a797bb2dd2e1e427c12e84c86b9ce6..659a8c75c837ec30e2f9fb6f1a1859b625626c3e:/sdk/go/keepclient/keepclient.go diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index e1c25c9e1d..5d791948dc 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -2,20 +2,22 @@ package keepclient import ( - "git.curoverse.com/arvados.git/sdk/go/streamer" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "crypto/md5" + "crypto/tls" "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" "log" "net/http" + "os" "regexp" - "sort" "strings" "sync" "sync/atomic" + "time" "unsafe" ) @@ -36,7 +38,7 @@ type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int Using_proxy bool - service_roots *[]string + service_roots *map[string]string lock sync.Mutex Client *http.Client } @@ -44,13 +46,16 @@ type KeepClient struct { // Create a new KeepClient. This will contact the API server to discover Keep // servers. func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) { + var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") + insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) kc = KeepClient{ Arvados: arv, Want_replicas: 2, Using_proxy: false, - Client: &http.Client{Transport: &http.Transport{}}} - - err = (&kc).DiscoverKeepServers() + Client: &http.Client{Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + } + _, err = (&kc).DiscoverKeepServers() return kc, err } @@ -132,8 +137,12 @@ func (this KeepClient) AuthorizedGet(hash string, timestamp string) (reader io.ReadCloser, contentLength int64, url string, err error) { + // Take the hash of locator and timestamp in order to identify this + // specific transaction in log statements. + requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8] + // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) + sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() for _, host := range sv { var req *http.Request @@ -151,12 +160,26 @@ func (this KeepClient) AuthorizedGet(hash string, req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + log.Printf("[%v] Begin download %s", requestId, url) + var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK { + statusCode := -1 + var respbody []byte + if resp != nil { + statusCode = resp.StatusCode + if resp.Body != nil { + respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + } + } + response := strings.TrimSpace(string(respbody)) + log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"", + requestId, url, statusCode, err, response) continue } if resp.StatusCode == http.StatusOK { + log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode) return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil } } @@ -175,7 +198,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er func (this KeepClient) AuthorizedAsk(hash string, signature string, timestamp string) (contentLength int64, url string, err error) { // Calculate the ordering for asking servers - sv := this.shuffledServiceRoots(hash) + sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots() for _, host := range sv { var req *http.Request @@ -208,20 +231,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string, } // Atomically read the service_roots field. -func (this *KeepClient) ServiceRoots() []string { - r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) +func (this *KeepClient) ServiceRoots() map[string]string { + r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)))) return *r } // Atomically update the service_roots field. Enables you to update // service_roots without disrupting any GET or PUT operations that might // already be in progress. -func (this *KeepClient) SetServiceRoots(svc []string) { - // Must be sorted for ShuffledServiceRoots() to produce consistent - // results. - roots := make([]string, len(svc)) - copy(roots, svc) - sort.Strings(roots) +func (this *KeepClient) SetServiceRoots(new_roots map[string]string) { + roots := make(map[string]string) + for uuid, root := range new_roots { + roots[uuid] = root + } atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)), unsafe.Pointer(&roots)) }