<-st.handled
status := <-upload_status
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
})
log.Printf("TestUploadToStubKeepServer done")
<-st.handled
status := <-upload_status
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
})
log.Printf("TestUploadToStubKeepServerBufferReader done")
<-st.handled
status := <-upload_status
- c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
- c.Check(status.StatusCode, Equals, 500)
+ c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
+ c.Check(status.statusCode, Equals, 500)
})
log.Printf("TestFailedUploadToStubKeepServer done")
}
c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
}
}
+
+type StubProxyHandler struct {
+ handled chan string
+}
+
+func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ resp.Header().Set("X-Keep-Replicas-Stored", "2")
+ this.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
+func (s *StandaloneSuite) TestPutProxy(c *C) {
+ log.Printf("TestPutProxy")
+
+ st := StubProxyHandler{make(chan string, 1)}
+
+ kc, _ := MakeKeepClient()
+
+ kc.Want_replicas = 2
+ kc.Using_proxy = true
+ kc.ApiToken = "abc123"
+ kc.Service_roots = make([]string, 1)
+
+ ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+ for i, k := range ks1 {
+ kc.Service_roots[i] = k.url
+ defer k.listener.Close()
+ }
+
+ _, replicas, err := kc.PutB([]byte("foo"))
+ <-st.handled
+
+ c.Check(err, Equals, nil)
+ c.Check(replicas, Equals, 2)
+
+ log.Printf("TestPutProxy done")
+}
+
+func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
+ log.Printf("TestPutProxy")
+
+ st := StubProxyHandler{make(chan string, 1)}
+
+ kc, _ := MakeKeepClient()
+
+ kc.Want_replicas = 3
+ kc.Using_proxy = true
+ kc.ApiToken = "abc123"
+ kc.Service_roots = make([]string, 1)
+
+ ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+ for i, k := range ks1 {
+ kc.Service_roots[i] = k.url
+ defer k.listener.Close()
+ }
+
+ _, replicas, err := kc.PutB([]byte("foo"))
+ <-st.handled
+
+ c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(replicas, Equals, 2)
+
+ log.Printf("TestPutProxy done")
+}
"io"
"log"
"net/http"
+ "os"
"sort"
"strconv"
)
Hostname string `json:"service_host"`
Port int `json:"service_port"`
SSL bool `json:"service_ssl_flag"`
+ SvcType string `json:"service_type"`
}
func (this *KeepClient) discoverKeepServers() error {
+ if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
+ this.Service_roots = make([]string, 1)
+ this.Service_roots[0] = prx
+ this.Using_proxy = true
+ return nil
+ }
+
// Construct request of keep disk list
var req *http.Request
var err error
- if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+
+ if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible", this.ApiServer), nil); err != nil {
return err
}
return err
}
+ if resp.StatusCode != 200 {
+ // fall back on keep disks
+ if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+ return err
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ if resp, err = this.Client.Do(req); err != nil {
+ return err
+ }
+ }
+
type svcList struct {
Items []keepDisk `json:"items"`
}
for _, element := range m.Items {
n := ""
+
if element.SSL {
n = "s"
}
listed[url] = true
this.Service_roots = append(this.Service_roots, url)
}
+ if element.SvcType == "proxy" {
+ this.Using_proxy = true
+ }
}
// Must be sorted for ShuffledServiceRoots() to produce consistent
}
type uploadStatus struct {
- Err error
- Url string
- StatusCode int
+ err error
+ url string
+ statusCode int
+ replicas_stored int
}
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
- upload_status <- uploadStatus{err, url, 0}
+ upload_status <- uploadStatus{err, url, 0, 0}
body.Close()
return
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
req.Header.Add("Content-Type", "application/octet-stream")
+
+ if this.Using_proxy {
+ req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+ }
+
req.Body = body
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
- upload_status <- uploadStatus{err, url, 0}
+ upload_status <- uploadStatus{err, url, 0, 0}
return
}
+ rep := 1
+ if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+ fmt.Sscanf(xr, "%d", &rep)
+ }
+
if resp.StatusCode == http.StatusOK {
- upload_status <- uploadStatus{nil, url, resp.StatusCode}
+ upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
} else {
- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode}
+ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
}
}
defer close(upload_status)
// Desired number of replicas
+
remaining_replicas := this.Want_replicas
for remaining_replicas > 0 {
next_server += 1
active += 1
} else {
- return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ fmt.Print(active)
+ if active == 0 {
+ return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ } else {
+ break
+ }
}
}
// Now wait for something to happen.
status := <-upload_status
- if status.StatusCode == 200 {
+ if status.statusCode == 200 {
// good news!
- remaining_replicas -= 1
+ remaining_replicas -= status.replicas_stored
} else {
// writing to keep server failed for some reason
log.Printf("Keep server put to %v failed with '%v'",
- status.Url, status.Err)
+ status.url, status.err)
}
active -= 1
- log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+ log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active)
}
- return (this.Want_replicas - remaining_replicas), nil
+ return this.Want_replicas, nil
}