package keepclient
import (
- "git.curoverse.com/arvados.git/sdk/go/streamer"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"crypto/md5"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"log"
"net/http"
"regexp"
- "sort"
"strings"
"sync"
"sync/atomic"
+ "time"
"unsafe"
)
Arvados *arvadosclient.ArvadosClient
Want_replicas int
Using_proxy bool
- service_roots *[]string
+ service_roots *map[string]string
lock sync.Mutex
Client *http.Client
}
Arvados: arv,
Want_replicas: 2,
Using_proxy: false,
- Client: &http.Client{Transport: &http.Transport{}}}
-
+ Client: &http.Client{},
+ }
err = (&kc).DiscoverKeepServers()
return kc, err
timestamp string) (reader io.ReadCloser,
contentLength int64, url string, err error) {
+ // Take the hash of locator and timestamp in order to identify this
+ // specific transaction in log statements.
+ requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
+ log.Printf("[%v] Begin download %s", requestId, url)
+
var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+ respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ response := strings.TrimSpace(string(respbody))
+ log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
+ requestId, url, resp.StatusCode, err, response)
continue
}
if resp.StatusCode == http.StatusOK {
+ log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
}
}
func (this KeepClient) AuthorizedAsk(hash string, signature string,
timestamp string) (contentLength int64, url string, err error) {
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
}
// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
- r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+ r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
return *r
}
// Atomically update the service_roots field. Enables you to update
// service_roots without disrupting any GET or PUT operations that might
// already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- roots := make([]string, len(svc))
- copy(roots, svc)
- sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+ roots := make(map[string]string)
+ for uuid, root := range new_roots {
+ roots[uuid] = root
+ }
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
unsafe.Pointer(&roots))
}