package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"crypto/md5"
"crypto/tls"
"errors"
bufsize = BLOCKSIZE
}
- t := buffer.StartTransferFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
+ t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
defer t.Close()
return this.putReplicas(hash, t, expectedLength)
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
- t := buffer.StartTransferFromSlice(buf)
+ t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
return this.putReplicas(hash, t, int64(len(buf)))
package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"crypto/md5"
"flag"
"fmt"
}
func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+ log.Printf("TestUploadToStubKeepServer")
+
st := StubPutHandler{
c,
"acbd18db4cc2f85cedef654fccc4a4d8",
status := <-upload_status
c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
})
+
+ log.Printf("TestUploadToStubKeepServer done")
}
func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+ log.Printf("TestUploadToStubKeepServerBufferReader")
+
st := StubPutHandler{
c,
"acbd18db4cc2f85cedef654fccc4a4d8",
func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
- tr := buffer.StartTransferFromReader(512, reader)
+ tr := streamer.AsyncStreamFromReader(512, reader)
defer tr.Close()
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
writer.Write([]byte("foo"))
writer.Close()
- <-tr.Reader_status
<-st.handled
status := <-upload_status
c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
})
+
+ log.Printf("TestUploadToStubKeepServerBufferReader done")
}
type FailHandler struct {
}
func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+ log.Printf("TestFailedUploadToStubKeepServer")
+
st := FailHandler{
make(chan string)}
c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
c.Check(status.StatusCode, Equals, 500)
})
-
+ log.Printf("TestFailedUploadToStubKeepServer done")
}
type KeepServer struct {
(s1 == shuff[1] && s2 == shuff[0]),
Equals,
true)
+
+ log.Printf("TestPutB done")
}
func (s *StandaloneSuite) TestPutHR(c *C) {
(s1 == shuff[1] && s2 == shuff[0]),
Equals,
true)
+
+ log.Printf("TestPutHR done")
}
func (s *StandaloneSuite) TestPutWithFail(c *C) {
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 1)
c.Check(<-st.handled, Equals, shuff[1])
+
+ log.Printf("TestPutWithTooManyFail done")
}
type StubGetHandler struct {
}
func (s *StandaloneSuite) TestGet(c *C) {
+ log.Printf("TestGet")
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
kc.Service_roots = []string{url}
r, n, url2, err := kc.Get(hash)
+ defer r.Close()
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
c.Check(content, DeepEquals, []byte("foo"))
+
+ log.Printf("TestGet done")
}
func (s *StandaloneSuite) TestGetFail(c *C) {
package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"encoding/json"
"errors"
"fmt"
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
upload_status <- uploadStatus{err, url, 0}
+ body.Close()
return
}
func (this KeepClient) putReplicas(
hash string,
- tr buffer.TransferBuffer,
+ tr *streamer.AsyncStream,
expectedLength int64) (replicas int, err error) {
// Calculate the ordering for uploading to servers
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeBufferReader(), upload_status, expectedLength)
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
next_server += 1
active += 1
} else {
}
// Now wait for something to happen.
- select {
- case status := <-tr.Reader_status:
- if status == io.EOF {
- // good news!
- } else {
- // bad news
- return (this.Want_replicas - remaining_replicas), status
- }
- case status := <-upload_status:
- if status.StatusCode == 200 {
- // good news!
- remaining_replicas -= 1
- } else {
- // writing to keep server failed for some reason
- log.Printf("Keep server put to %v failed with '%v'",
- status.Url, status.Err)
- }
- active -= 1
- log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+ status := <-upload_status
+ if status.StatusCode == 200 {
+ // good news!
+ remaining_replicas -= 1
+ } else {
+ // writing to keep server failed for some reason
+ log.Printf("Keep server put to %v failed with '%v'",
+ status.Url, status.Err)
}
+ active -= 1
+ log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
}
return (this.Want_replicas - remaining_replicas), nil