4869: KeepClient now has a default timeout per block request (10 minutes). In
[arvados.git] / sdk / go / keepclient / keepclient.go
index e1c25c9e1d82ee5f2f41bbb72182faf5a73b638d..4733bb76293b760641573943f1dd7355501d3e7f 100644 (file)
@@ -2,20 +2,20 @@
 package keepclient
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "crypto/md5"
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
        "log"
        "net/http"
        "regexp"
-       "sort"
        "strings"
        "sync"
        "sync/atomic"
+       "time"
        "unsafe"
 )
 
@@ -36,7 +36,7 @@ type KeepClient struct {
        Arvados       *arvadosclient.ArvadosClient
        Want_replicas int
        Using_proxy   bool
-       service_roots *[]string
+       service_roots *map[string]string
        lock          sync.Mutex
        Client        *http.Client
 }
@@ -48,7 +48,7 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error)
                Arvados:       arv,
                Want_replicas: 2,
                Using_proxy:   false,
-               Client:        &http.Client{Transport: &http.Transport{}}}
+               Client:        &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
 
        err = (&kc).DiscoverKeepServers()
 
@@ -132,8 +132,12 @@ func (this KeepClient) AuthorizedGet(hash string,
        timestamp string) (reader io.ReadCloser,
        contentLength int64, url string, err error) {
 
+       // Take the hash of locator and timestamp in order to identify this
+       // specific transaction in log statements.
+       tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -151,12 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string,
 
                req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 
+               log.Printf("[%v] Begin download %s", tag, url)
+
                var resp *http.Response
-               if resp, err = this.Client.Do(req); err != nil {
+               if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+                       respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                       response := strings.TrimSpace(string(respbody))
+                       log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
+                               tag, url, resp.StatusCode, err, response)
                        continue
                }
 
                if resp.StatusCode == http.StatusOK {
+                       log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
                        return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
                }
        }
@@ -175,7 +186,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er
 func (this KeepClient) AuthorizedAsk(hash string, signature string,
        timestamp string) (contentLength int64, url string, err error) {
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -208,20 +219,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 }
 
 // Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
-       r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+       r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
        return *r
 }
 
 // Atomically update the service_roots field.  Enables you to update
 // service_roots without disrupting any GET or PUT operations that might
 // already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
-       // Must be sorted for ShuffledServiceRoots() to produce consistent
-       // results.
-       roots := make([]string, len(svc))
-       copy(roots, svc)
-       sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+       roots := make(map[string]string)
+       for uuid, root := range new_roots {
+               roots[uuid] = root
+       }
        atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
                unsafe.Pointer(&roots))
 }