Merge branch 'master' into 3762-delete-list-worker
[arvados.git] / sdk / go / keepclient / keepclient.go
index aa7e78ba8fcb00eb2ae0a9f35d6dfb8385967b77..5d791948dcb808f3373555d183d61f7df5a22100 100644 (file)
@@ -2,20 +2,22 @@
 package keepclient
 
 import (
-       streamer "git.curoverse.com/arvados.git/sdk/go/streamer"
-       arvadosclient "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "crypto/md5"
+       "crypto/tls"
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
        "log"
        "net/http"
+       "os"
        "regexp"
-       "sort"
        "strings"
        "sync"
        "sync/atomic"
+       "time"
        "unsafe"
 )
 
@@ -36,7 +38,7 @@ type KeepClient struct {
        Arvados       *arvadosclient.ArvadosClient
        Want_replicas int
        Using_proxy   bool
-       service_roots *[]string
+       service_roots *map[string]string
        lock          sync.Mutex
        Client        *http.Client
 }
@@ -44,13 +46,16 @@ type KeepClient struct {
 // Create a new KeepClient.  This will contact the API server to discover Keep
 // servers.
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
+       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+       insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
        kc = KeepClient{
                Arvados:       arv,
                Want_replicas: 2,
                Using_proxy:   false,
-               Client:        &http.Client{Transport: &http.Transport{}}}
-
-       err = (&kc).DiscoverKeepServers()
+               Client: &http.Client{Transport: &http.Transport{
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+       }
+       _, err = (&kc).DiscoverKeepServers()
 
        return kc, err
 }
@@ -132,8 +137,12 @@ func (this KeepClient) AuthorizedGet(hash string,
        timestamp string) (reader io.ReadCloser,
        contentLength int64, url string, err error) {
 
+       // Take the hash of locator and timestamp in order to identify this
+       // specific transaction in log statements.
+       requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -151,12 +160,26 @@ func (this KeepClient) AuthorizedGet(hash string,
 
                req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 
+               log.Printf("[%v] Begin download %s", requestId, url)
+
                var resp *http.Response
-               if resp, err = this.Client.Do(req); err != nil {
+               if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+                       statusCode := -1
+                       var respbody []byte
+                       if resp != nil {
+                               statusCode = resp.StatusCode
+                               if resp.Body != nil {
+                                       respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                               }
+                       }
+                       response := strings.TrimSpace(string(respbody))
+                       log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
+                               requestId, url, statusCode, err, response)
                        continue
                }
 
                if resp.StatusCode == http.StatusOK {
+                       log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
                        return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
                }
        }
@@ -175,7 +198,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er
 func (this KeepClient) AuthorizedAsk(hash string, signature string,
        timestamp string) (contentLength int64, url string, err error) {
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -208,20 +231,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 }
 
 // Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
-       r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+       r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
        return *r
 }
 
 // Atomically update the service_roots field.  Enables you to update
 // service_roots without disrupting any GET or PUT operations that might
 // already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
-       // Must be sorted for ShuffledServiceRoots() to produce consistent
-       // results.
-       roots := make([]string, len(svc))
-       copy(roots, svc)
-       sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+       roots := make(map[string]string)
+       for uuid, root := range new_roots {
+               roots[uuid] = root
+       }
        atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
                unsafe.Pointer(&roots))
 }