From 9482cf9f326ce64f9ea100aa34680278bdd6018e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 19 May 2014 14:56:27 -0400 Subject: [PATCH] 2798: Continued refactoring buffer reader into separate buffer package. Made internal buffer management functions package-internal and added a public API. Updated keepclient to use buffer package public API. Added HashCheckingReader to provide transparent checking of checksums. Added more package and API comments. --- sdk/go/src/arvados.org/buffer/buffer.go | 214 ++----------- sdk/go/src/arvados.org/buffer/buffer_test.go | 48 +-- sdk/go/src/arvados.org/buffer/transfer.go | 220 +++++++++++++ .../src/arvados.org/keepclient/hashcheck.go | 77 +++++ .../arvados.org/keepclient/hashcheck_test.go | 85 +++++ .../src/arvados.org/keepclient/keepclient.go | 301 ++++-------------- .../arvados.org/keepclient/keepclient_test.go | 117 ++++--- sdk/go/src/arvados.org/keepclient/support.go | 221 +++++++++++++ 8 files changed, 785 insertions(+), 498 deletions(-) create mode 100644 sdk/go/src/arvados.org/buffer/transfer.go create mode 100644 sdk/go/src/arvados.org/keepclient/hashcheck.go create mode 100644 sdk/go/src/arvados.org/keepclient/hashcheck_test.go create mode 100644 sdk/go/src/arvados.org/keepclient/support.go diff --git a/sdk/go/src/arvados.org/buffer/buffer.go b/sdk/go/src/arvados.org/buffer/buffer.go index 6af1dd0fa7..81a9ca3b57 100644 --- a/sdk/go/src/arvados.org/buffer/buffer.go +++ b/sdk/go/src/arvados.org/buffer/buffer.go @@ -1,3 +1,12 @@ +/* Implements a buffer that is filled incrementally from a io.Reader and +supports multiple readers on the buffer. + +Usage: + +To + +*/ + package buffer import ( @@ -5,84 +14,43 @@ import ( "log" ) -type ReaderSlice struct { - slice []byte - reader_error error +type TransferBuffer struct { + requests chan readRequest + Reader_status chan error } -// Read repeatedly from the reader into the specified buffer, and report each -// read to channel 'c'. Completes when Reader 'r' reports on the error channel -// and closes channel 'c'. -func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) { - defer close(slices) +// Reads from the buffer managed by the Transfer() +type BufferReader struct { + offset *int + requests chan<- readRequest + responses chan readResult +} - // Initially use entire buffer as scratch space - ptr := buffer[:] - for { - var n int - var err error - if len(ptr) > 0 { - // Read into the scratch space - n, err = r.Read(ptr) - } else { - // Ran out of scratch space, try reading one more byte - var b [1]byte - n, err = r.Read(b[:]) - - if n > 0 { - // Reader has more data but we have nowhere to - // put it, so we're stuffed - slices <- ReaderSlice{nil, io.ErrShortBuffer} - } else { - // Return some other error (hopefully EOF) - slices <- ReaderSlice{nil, err} - } - return - } +func StartTransferFromReader(buffersize int, source io.Reader) TransferBuffer { + buf := make([]byte, buffersize) - // End on error (includes EOF) - if err != nil { - slices <- ReaderSlice{nil, err} - return - } + t := TransferBuffer{make(chan readRequest), make(chan error)} - if n > 0 { - // Make a slice with the contents of the read - slices <- ReaderSlice{ptr[:n], nil} + go transfer(buf, source, t.requests, t.Reader_status) - // Adjust the scratch space slice - ptr = ptr[n:] - } - } + return t } -// A read request to the Transfer() function -type ReadRequest struct { - offset int - maxsize int - result chan<- ReadResult -} +func StartTransferFromSlice(buf []byte) TransferBuffer { + t := TransferBuffer{make(chan readRequest), nil} -// A read result from the Transfer() function -type ReadResult struct { - slice []byte - err error -} + go transfer(buf, nil, t.requests, nil) -// Reads from the buffer managed by the Transfer() -type BufferReader struct { - offset *int - requests chan<- ReadRequest - responses chan ReadResult + return t } -func MakeBufferReader(requests chan<- ReadRequest) BufferReader { - return BufferReader{new(int), requests, make(chan ReadResult)} +func (this TransferBuffer) MakeBufferReader() BufferReader { + return BufferReader{new(int), this.requests, make(chan readResult)} } // Reads from the buffer managed by the Transfer() func (this BufferReader) Read(p []byte) (n int, err error) { - this.requests <- ReadRequest{*this.offset, len(p), this.responses} + this.requests <- readRequest{*this.offset, len(p), this.responses} rr, valid := <-this.responses if valid { *this.offset += len(rr.slice) @@ -96,7 +64,7 @@ func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) { // Record starting offset in order to correctly report the number of bytes sent starting_offset := *this.offset for { - this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses} + this.requests <- readRequest{*this.offset, 32 * 1024, this.responses} rr, valid := <-this.responses if valid { log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err) @@ -123,121 +91,9 @@ func (this BufferReader) Close() error { return nil } -// Handle a read request. Returns true if a response was sent, and false if -// the request should be queued. -func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool { - log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body)) - if req.offset < len(body) { - var end int - if req.offset+req.maxsize < len(body) { - end = req.offset + req.maxsize - } else { - end = len(body) - } - req.result <- ReadResult{body[req.offset:end], nil} - return true - } else if complete && req.offset >= len(body) { - req.result <- ReadResult{nil, io.EOF} - return true - } else { - return false - } -} - -// If 'source_reader' is not nil, reads data from 'source_reader' and stores it -// in the provided buffer. Otherwise, use the contents of 'buffer' as is. -// Accepts read requests on the buffer on the 'requests' channel. Completes -// when 'requests' channel is closed. -func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) { - // currently buffered data - var body []byte - - // for receiving slices from ReadIntoBuffer - var slices chan ReaderSlice = nil - - // indicates whether the buffered data is complete - var complete bool = false - - if source_reader != nil { - // 'body' is the buffer slice representing the body content read so far - body = source_buffer[:0] - - // used to communicate slices of the buffer as they are - // ReadIntoBuffer will close 'slices' when it is done with it - slices = make(chan ReaderSlice) - - // Spin it off - go ReadIntoBuffer(source_buffer, source_reader, slices) - } else { - // use the whole buffer - body = source_buffer[:] - - // buffer is complete - complete = true - } - - pending_requests := make([]ReadRequest, 0) - - for { - select { - case req, valid := <-requests: - // Handle a buffer read request - if valid { - if !HandleReadRequest(req, body, complete) { - pending_requests = append(pending_requests, req) - } - } else { - // closed 'requests' channel indicates we're done - return - } - - case bk, valid := <-slices: - // Got a new slice from the reader - if valid { - if bk.reader_error != nil { - reader_error <- bk.reader_error - if bk.reader_error == io.EOF { - // EOF indicates the reader is done - // sending, so our buffer is complete. - complete = true - } else { - // some other reader error - return - } - } - - if bk.slice != nil { - // adjust body bounds now that another slice has been read - body = source_buffer[0 : len(body)+len(bk.slice)] - } - - // handle pending reads - n := 0 - for n < len(pending_requests) { - if HandleReadRequest(pending_requests[n], body, complete) { - - // move the element from the - // back of the slice to - // position 'n', then shorten - // the slice by one element - pending_requests[n] = pending_requests[len(pending_requests)-1] - pending_requests = pending_requests[0 : len(pending_requests)-1] - } else { - - // Request wasn't handled, so keep it in the request slice - n += 1 - } - } - } else { - if complete { - // no more reads - slices = nil - } else { - // reader channel closed without signaling EOF - reader_error <- io.ErrUnexpectedEOF - return - } - } - } +func (this TransferBuffer) Close() { + close(this.requests) + if this.Reader_status != nil { + close(this.Reader_status) } } diff --git a/sdk/go/src/arvados.org/buffer/buffer_test.go b/sdk/go/src/arvados.org/buffer/buffer_test.go index 28e1e66dfc..f35b110f03 100644 --- a/sdk/go/src/arvados.org/buffer/buffer_test.go +++ b/sdk/go/src/arvados.org/buffer/buffer_test.go @@ -19,9 +19,9 @@ func ReadIntoBufferHelper(c *C, bufsize int) { buffer := make([]byte, bufsize) reader, writer := io.Pipe() - slices := make(chan ReaderSlice) + slices := make(chan readerSlice) - go ReadIntoBuffer(buffer, reader, slices) + go readIntoBuffer(buffer, reader, slices) { out := make([]byte, 128) @@ -82,9 +82,9 @@ func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { buffer := make([]byte, 223) reader, writer := io.Pipe() - slices := make(chan ReaderSlice) + slices := make(chan readerSlice) - go ReadIntoBuffer(buffer, reader, slices) + go readIntoBuffer(buffer, reader, slices) { out := make([]byte, 128) @@ -144,19 +144,9 @@ func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { func (s *StandaloneSuite) TestTransfer(c *C) { reader, writer := io.Pipe() - // Buffer for reads from 'r' - buffer := make([]byte, 512) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) + tr := StartTransferFromReader(512, reader) - go Transfer(buffer, reader, requests, reader_status) - - br1 := MakeBufferReader(requests) + br1 := tr.MakeBufferReader() out := make([]byte, 128) { @@ -220,7 +210,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) { } } - br2 := MakeBufferReader(requests) + br2 := tr.MakeBufferReader() { // Test 'catch up' reader in := make([]byte, 256) @@ -237,7 +227,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) { { // Test closing the reader writer.Close() - status := <-reader_status + status := <-tr.Reader_status c.Check(status, Equals, io.EOF) in := make([]byte, 256) @@ -251,7 +241,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) { { // Test 'catch up' reader after closing - br3 := MakeBufferReader(requests) + br3 := tr.MakeBufferReader() in := make([]byte, 256) n, err := br3.Read(in) @@ -276,13 +266,13 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { buffer := make([]byte, 100) // Read requests on Transfer() buffer - requests := make(chan ReadRequest) + requests := make(chan readRequest) defer close(requests) // Reporting reader error states reader_status := make(chan error) - go Transfer(buffer, reader, requests, reader_status) + go transfer(buffer, reader, requests, reader_status) out := make([]byte, 101) go writer.Write(out) @@ -298,13 +288,9 @@ func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { buffer[i] = byte(i) } - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) + tr := StartTransferFromSlice(buffer) - br1 := MakeBufferReader(requests) + br1 := tr.MakeBufferReader() in := make([]byte, 64) { @@ -342,13 +328,9 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) { buffer[i] = byte(i) } - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) + tr := StartTransferFromSlice(buffer) - br1 := MakeBufferReader(requests) + br1 := tr.MakeBufferReader() reader, writer := io.Pipe() diff --git a/sdk/go/src/arvados.org/buffer/transfer.go b/sdk/go/src/arvados.org/buffer/transfer.go new file mode 100644 index 0000000000..6dbc921b67 --- /dev/null +++ b/sdk/go/src/arvados.org/buffer/transfer.go @@ -0,0 +1,220 @@ +package buffer + +import ( + "io" + "log" +) + +// A slice passed from readIntoBuffer() to transfer() +type readerSlice struct { + slice []byte + reader_error error +} + +// A read request to the Transfer() function +type readRequest struct { + offset int + maxsize int + result chan<- readResult +} + +// A read result from the Transfer() function +type readResult struct { + slice []byte + err error +} + +// Supports writing into a buffer +type bufferWriter struct { + buf []byte + ptr *int +} + +// Copy p into this.buf, increment pointer and return number of bytes read. +func (this bufferWriter) Write(p []byte) (n int, err error) { + n = copy(this.buf[*this.ptr:], p) + *this.ptr += n + return n, nil +} + +// Read repeatedly from the reader and write sequentially into the specified +// buffer, and report each read to channel 'c'. Completes when Reader 'r' +// reports on the error channel and closes channel 'c'. +func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) { + defer close(slices) + + if writeto, ok := r.(io.WriterTo); ok { + n, err := writeto.WriteTo(bufferWriter{buffer, new(int)}) + if err != nil { + slices <- readerSlice{nil, err} + } else { + slices <- readerSlice{buffer[:n], nil} + slices <- readerSlice{nil, io.EOF} + } + return + } else { + // Initially entire buffer is available + ptr := buffer[:] + for { + var n int + var err error + if len(ptr) > 0 { + const readblock = 64 * 1024 + // Read 64KiB into the next part of the buffer + if len(ptr) > readblock { + n, err = r.Read(ptr[:readblock]) + } else { + n, err = r.Read(ptr) + } + } else { + // Ran out of buffer space, try reading one more byte + var b [1]byte + n, err = r.Read(b[:]) + + if n > 0 { + // Reader has more data but we have nowhere to + // put it, so we're stuffed + slices <- readerSlice{nil, io.ErrShortBuffer} + } else { + // Return some other error (hopefully EOF) + slices <- readerSlice{nil, err} + } + return + } + + // End on error (includes EOF) + if err != nil { + slices <- readerSlice{nil, err} + return + } + + if n > 0 { + // Make a slice with the contents of the read + slices <- readerSlice{ptr[:n], nil} + + // Adjust the scratch space slice + ptr = ptr[n:] + } + } + } +} + +// Handle a read request. Returns true if a response was sent, and false if +// the request should be queued. +func handleReadRequest(req readRequest, body []byte, complete bool) bool { + log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body)) + if req.offset < len(body) { + var end int + if req.offset+req.maxsize < len(body) { + end = req.offset + req.maxsize + } else { + end = len(body) + } + req.result <- readResult{body[req.offset:end], nil} + return true + } else if complete && req.offset >= len(body) { + req.result <- readResult{nil, io.EOF} + return true + } else { + return false + } +} + +// Mediates between reads and appends. +// If 'source_reader' is not nil, reads data from 'source_reader' and stores it +// in the provided buffer. Otherwise, use the contents of 'buffer' as is. +// Accepts read requests on the buffer on the 'requests' channel. Completes +// when 'requests' channel is closed. +func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) { + // currently buffered data + var body []byte + + // for receiving slices from readIntoBuffer + var slices chan readerSlice = nil + + // indicates whether the buffered data is complete + var complete bool = false + + if source_reader != nil { + // 'body' is the buffer slice representing the body content read so far + body = source_buffer[:0] + + // used to communicate slices of the buffer as they are + // readIntoBuffer will close 'slices' when it is done with it + slices = make(chan readerSlice) + + // Spin it off + go readIntoBuffer(source_buffer, source_reader, slices) + } else { + // use the whole buffer + body = source_buffer[:] + + // buffer is complete + complete = true + } + + pending_requests := make([]readRequest, 0) + + for { + select { + case req, valid := <-requests: + // Handle a buffer read request + if valid { + if !handleReadRequest(req, body, complete) { + pending_requests = append(pending_requests, req) + } + } else { + // closed 'requests' channel indicates we're done + return + } + + case bk, valid := <-slices: + // Got a new slice from the reader + if valid { + if bk.reader_error != nil { + reader_error <- bk.reader_error + if bk.reader_error == io.EOF { + // EOF indicates the reader is done + // sending, so our buffer is complete. + complete = true + } else { + // some other reader error + return + } + } + + if bk.slice != nil { + // adjust body bounds now that another slice has been read + body = source_buffer[0 : len(body)+len(bk.slice)] + } + + // handle pending reads + n := 0 + for n < len(pending_requests) { + if handleReadRequest(pending_requests[n], body, complete) { + + // move the element from the + // back of the slice to + // position 'n', then shorten + // the slice by one element + pending_requests[n] = pending_requests[len(pending_requests)-1] + pending_requests = pending_requests[0 : len(pending_requests)-1] + } else { + + // Request wasn't handled, so keep it in the request slice + n += 1 + } + } + } else { + if complete { + // no more reads + slices = nil + } else { + // reader channel closed without signaling EOF + reader_error <- io.ErrUnexpectedEOF + return + } + } + } + } +} diff --git a/sdk/go/src/arvados.org/keepclient/hashcheck.go b/sdk/go/src/arvados.org/keepclient/hashcheck.go new file mode 100644 index 0000000000..a585d00888 --- /dev/null +++ b/sdk/go/src/arvados.org/keepclient/hashcheck.go @@ -0,0 +1,77 @@ +// Lightweight implementation of io.ReadCloser that checks the contents read +// from the underlying io.Reader a against checksum hash. To avoid reading the +// entire contents into a buffer up front, the hash is updated with each read, +// and the actual checksum is not checked until the underlying reader returns +// EOF. +package keepclient + +import ( + "errors" + "fmt" + "hash" + "io" +) + +var BadChecksum = errors.New("Reader failed checksum") + +type HashCheckingReader struct { + // The underlying data source + io.Reader + + // The hashing function to use + hash.Hash + + // The hash value to check against. Must be a hex-encoded lowercase string. + Check string +} + +// Read from the underlying reader, update the hashing function, and pass the +// results through. Will return BadChecksum on the last read instead of EOF if +// the checksum doesn't match. +func (this HashCheckingReader) Read(p []byte) (n int, err error) { + n, err = this.Reader.Read(p) + if err == nil { + this.Hash.Write(p[:n]) + } else if err == io.EOF { + sum := this.Hash.Sum(make([]byte, 0, this.Hash.Size())) + if fmt.Sprintf("%x", sum) != this.Check { + err = BadChecksum + } + } + return n, err +} + +// Write entire contents of this.Reader to 'dest'. Returns BadChecksum if the +// data written to 'dest' doesn't match the hash code of this.Check. +func (this HashCheckingReader) WriteTo(dest io.Writer) (written int64, err error) { + if writeto, ok := this.Reader.(io.WriterTo); ok { + written, err = writeto.WriteTo(io.MultiWriter(dest, this.Hash)) + } else { + written, err = io.Copy(io.MultiWriter(dest, this.Hash), this.Reader) + } + + sum := this.Hash.Sum(make([]byte, 0, this.Hash.Size())) + + if fmt.Sprintf("%x", sum) != this.Check { + err = BadChecksum + } + + return written, err +} + +// Close() the underlying Reader if it is castable to io.ReadCloser. This will +// drain the underlying reader of any remaining data and check the checksum. +func (this HashCheckingReader) Close() (err error) { + _, err = io.Copy(this.Hash, this.Reader) + + if closer, ok := this.Reader.(io.ReadCloser); ok { + err = closer.Close() + } + + sum := this.Hash.Sum(make([]byte, 0, this.Hash.Size())) + if fmt.Sprintf("%x", sum) != this.Check { + err = BadChecksum + } + + return err +} diff --git a/sdk/go/src/arvados.org/keepclient/hashcheck_test.go b/sdk/go/src/arvados.org/keepclient/hashcheck_test.go new file mode 100644 index 0000000000..371a9894a3 --- /dev/null +++ b/sdk/go/src/arvados.org/keepclient/hashcheck_test.go @@ -0,0 +1,85 @@ +package keepclient + +import ( + "bytes" + "crypto/md5" + "fmt" + . "gopkg.in/check.v1" + "io" + "io/ioutil" +) + +type HashcheckSuiteSuite struct{} + +// Gocheck boilerplate +var _ = Suite(&HashcheckSuiteSuite{}) + +func (h *HashcheckSuiteSuite) TestRead(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + { + r, w := io.Pipe() + hcr := HashCheckingReader{r, md5.New(), hash} + go func() { + w.Write([]byte("foo")) + w.Close() + }() + p, err := ioutil.ReadAll(hcr) + c.Check(len(p), Equals, 3) + c.Check(err, Equals, nil) + } + + { + r, w := io.Pipe() + hcr := HashCheckingReader{r, md5.New(), hash} + go func() { + w.Write([]byte("bar")) + w.Close() + }() + p, err := ioutil.ReadAll(hcr) + c.Check(len(p), Equals, 3) + c.Check(err, Equals, BadChecksum) + } +} + +func (h *HashcheckSuiteSuite) TestWriteTo(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + { + bb := bytes.NewBufferString("foo") + hcr := HashCheckingReader{bb, md5.New(), hash} + r, w := io.Pipe() + done := make(chan bool) + go func() { + p, err := ioutil.ReadAll(r) + c.Check(len(p), Equals, 3) + c.Check(err, Equals, nil) + done <- true + }() + + n, err := hcr.WriteTo(w) + w.Close() + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, nil) + <-done + } + + { + bb := bytes.NewBufferString("bar") + hcr := HashCheckingReader{bb, md5.New(), hash} + r, w := io.Pipe() + done := make(chan bool) + go func() { + p, err := ioutil.ReadAll(r) + c.Check(len(p), Equals, 3) + c.Check(err, Equals, nil) + done <- true + }() + + n, err := hcr.WriteTo(w) + w.Close() + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, BadChecksum) + <-done + } +} diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index 829ab0efb0..dcf1f33bba 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -1,19 +1,16 @@ +/* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */ package keepclient import ( "arvados.org/buffer" "crypto/md5" "crypto/tls" - "encoding/json" "errors" "fmt" "io" "io/ioutil" - "log" "net/http" "os" - "sort" - "strconv" ) // A Keep "block" is 64MB. @@ -21,7 +18,9 @@ const BLOCKSIZE = 64 * 1024 * 1024 var BlockNotFound = errors.New("Block not found") var InsufficientReplicasError = errors.New("Could not write sufficient replicas") +var OversizeBlockError = errors.New("Block too big") +// Information about Arvados and Keep servers. type KeepClient struct { ApiServer string ApiToken string @@ -31,277 +30,78 @@ type KeepClient struct { Client *http.Client } -type KeepDisk struct { - Hostname string `json:"service_host"` - Port int `json:"service_port"` - SSL bool `json:"service_ssl_flag"` -} - +// Create a new KeepClient, initialized with standard Arvados environment +// variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally) +// ARVADOS_API_HOST_INSECURE. This will contact the API server to discover +// Keep servers. func MakeKeepClient() (kc KeepClient, err error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure}, - } + insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") != "") kc = KeepClient{ ApiServer: os.Getenv("ARVADOS_API_HOST"), ApiToken: os.Getenv("ARVADOS_API_TOKEN"), - ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""), + ApiInsecure: insecure, Want_replicas: 2, - Client: &http.Client{Transport: tr}} + Client: &http.Client{Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, + }}} - err = (&kc).DiscoverKeepServers() + err = (&kc).discoverKeepServers() return kc, err } -func (this *KeepClient) DiscoverKeepServers() error { - // Construct request of keep disk list - var req *http.Request - var err error - if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil { - return err - } - - // Add api token header - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) - - // Make the request - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { - return err - } - - type SvcList struct { - Items []KeepDisk `json:"items"` - } - - // Decode json reply - dec := json.NewDecoder(resp.Body) - var m SvcList - if err := dec.Decode(&m); err != nil { - return err - } - - listed := make(map[string]bool) - this.Service_roots = make([]string, 0, len(m.Items)) - - for _, element := range m.Items { - n := "" - if element.SSL { - n = "s" - } - - // Construct server URL - url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port) - - // Skip duplicates - if !listed[url] { - listed[url] = true - this.Service_roots = append(this.Service_roots, url) - } - } - - // Must be sorted for ShuffledServiceRoots() to produce consistent - // results. - sort.Strings(this.Service_roots) - - return nil -} - -func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) { - // Build an ordering with which to query the Keep servers based on the - // contents of the hash. "hash" is a hex-encoded number at least 8 - // digits (32 bits) long - - // seed used to calculate the next keep server from 'pool' to be added - // to 'pseq' - seed := hash - - // Keep servers still to be added to the ordering - pool := make([]string, len(this.Service_roots)) - copy(pool, this.Service_roots) - - // output probe sequence - pseq = make([]string, 0, len(this.Service_roots)) - - // iterate while there are servers left to be assigned - for len(pool) > 0 { - - if len(seed) < 8 { - // ran out of digits in the seed - if len(pseq) < (len(hash) / 4) { - // the number of servers added to the probe - // sequence is less than the number of 4-digit - // slices in 'hash' so refill the seed with the - // last 4 digits. - seed = hash[len(hash)-4:] - } - seed += hash - } - - // Take the next 8 digits (32 bytes) and interpret as an integer, - // then modulus with the size of the remaining pool to get the next - // selected server. - probe, _ := strconv.ParseUint(seed[0:8], 16, 32) - probe %= uint64(len(pool)) - - // Append the selected server to the probe sequence and remove it - // from the pool. - pseq = append(pseq, pool[probe]) - pool = append(pool[:probe], pool[probe+1:]...) - - // Remove the digits just used from the seed - seed = seed[8:] - } - return pseq -} - -type UploadStatus struct { - Err error - Url string - StatusCode int -} - -func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, - upload_status chan<- UploadStatus, expectedLength int64) { - - log.Printf("Uploading to %s", host) - - var req *http.Request - var err error - var url = fmt.Sprintf("%s/%s", host, hash) - if req, err = http.NewRequest("PUT", url, nil); err != nil { - upload_status <- UploadStatus{err, url, 0} - return - } - - if expectedLength > 0 { - req.ContentLength = expectedLength - } - - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) - req.Header.Add("Content-Type", "application/octet-stream") - req.Body = body - - var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { - upload_status <- UploadStatus{err, url, 0} - return - } - - if resp.StatusCode == http.StatusOK { - upload_status <- UploadStatus{nil, url, resp.StatusCode} - } else { - upload_status <- UploadStatus{errors.New(resp.Status), url, resp.StatusCode} - } -} - -func (this KeepClient) putReplicas( - hash string, - requests chan buffer.ReadRequest, - reader_status chan error, - expectedLength int64) (replicas int, err error) { - - // Calculate the ordering for uploading to servers - sv := this.ShuffledServiceRoots(hash) - - // The next server to try contacting - next_server := 0 - - // The number of active writers - active := 0 - - // Used to communicate status from the upload goroutines - upload_status := make(chan UploadStatus) - defer close(upload_status) - - // Desired number of replicas - remaining_replicas := this.Want_replicas - - for remaining_replicas > 0 { - for active < remaining_replicas { - // Start some upload requests - if next_server < len(sv) { - go this.uploadToKeepServer(sv[next_server], hash, buffer.MakeBufferReader(requests), upload_status, expectedLength) - next_server += 1 - active += 1 - } else { - return (this.Want_replicas - remaining_replicas), InsufficientReplicasError - } - } - - // Now wait for something to happen. - select { - case status := <-reader_status: - if status == io.EOF { - // good news! - } else { - // bad news - return (this.Want_replicas - remaining_replicas), status - } - case status := <-upload_status: - if status.StatusCode == 200 { - // good news! - remaining_replicas -= 1 - } else { - // writing to keep server failed for some reason - log.Printf("Keep server put to %v failed with '%v'", - status.Url, status.Err) - } - active -= 1 - log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active) - } - } - - return (this.Want_replicas - remaining_replicas), nil -} - -var OversizeBlockError = errors.New("Block too big") - +// Put a block given the block hash, a reader with the block data, and the +// expected length of that data. The desired number of replicas is given in +// KeepClient.Want_replicas. Returns the number of replicas that were written +// and if there was an error. Note this will return InsufficientReplias +// whenever 0 <= replicas < this.Wants_replicas. func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) { // Buffer for reads from 'r' - var buf []byte + var bufsize int if expectedLength > 0 { if expectedLength > BLOCKSIZE { return 0, OversizeBlockError } - buf = make([]byte, expectedLength) + bufsize = int(expectedLength) } else { - buf = make([]byte, BLOCKSIZE) + bufsize = BLOCKSIZE } - // Read requests on Transfer() buffer - requests := make(chan buffer.ReadRequest) - defer close(requests) + t := buffer.StartTransferFromReader(bufsize, HashCheckingReader{r, md5.New(), hash}) + defer t.Close() - // Reporting reader error states - reader_status := make(chan error) - defer close(reader_status) - - // Start the transfer goroutine - go buffer.Transfer(buf, r, requests, reader_status) - - return this.putReplicas(hash, requests, reader_status, expectedLength) + return this.putReplicas(hash, t, expectedLength) } +// Put a block given the block hash and a byte buffer. The desired number of +// replicas is given in KeepClient.Want_replicas. Returns the number of +// replicas that were written and if there was an error. Note this will return +// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) { - // Read requests on Transfer() buffer - requests := make(chan buffer.ReadRequest) - defer close(requests) - - // Start the transfer goroutine - go buffer.Transfer(buf, nil, requests, nil) + t := buffer.StartTransferFromSlice(buf) + defer t.Close() - return this.putReplicas(hash, requests, nil, int64(len(buf))) + return this.putReplicas(hash, t, int64(len(buf))) } +// Put a block given a buffer. The hash will be computed. The desired number +// of replicas is given in KeepClient.Want_replicas. Returns the number of +// replicas that were written and if there was an error. Note this will return +// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas. func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) { hash = fmt.Sprintf("%x", md5.Sum(buffer)) replicas, err = this.PutHB(hash, buffer) return hash, replicas, err } +// Put a block, given a Reader. This will read the entire reader into a buffer +// to computed the hash. The desired number of replicas is given in +// KeepClient.Want_replicas. Returns the number of replicas that were written +// and if there was an error. Note this will return InsufficientReplias +// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block +// hash and data size are available, PutHR() is more efficient. func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) { if buffer, err := ioutil.ReadAll(r); err != nil { return "", 0, err @@ -310,18 +110,27 @@ func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) } } +// Get a block given a hash. Return a reader, the expected data length, the +// URL the block was fetched from, and if there was an error. If the block +// checksum does not match, the final Read() on the reader returned by this +// method will return a BadChecksum error instead of EOF. func (this KeepClient) Get(hash string) (reader io.ReadCloser, contentLength int64, url string, err error) { return this.AuthorizedGet(hash, "", "") } +// Get a block given a hash, with additional authorization provided by +// signature and timestamp. Return a reader, the expected data length, the URL +// the block was fetched from, and if there was an error. If the block +// checksum does not match, the final Read() on the reader returned by this +// method will return a BadChecksum error instead of EOF. func (this KeepClient) AuthorizedGet(hash string, signature string, timestamp string) (reader io.ReadCloser, contentLength int64, url string, err error) { // Calculate the ordering for asking servers - sv := this.ShuffledServiceRoots(hash) + sv := this.shuffledServiceRoots(hash) for _, host := range sv { var req *http.Request @@ -345,21 +154,25 @@ func (this KeepClient) AuthorizedGet(hash string, } if resp.StatusCode == http.StatusOK { - return resp.Body, resp.ContentLength, url, nil + return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil } } return nil, 0, "", BlockNotFound } +// Determine if a block with the given hash is available and readable, but does +// not return the block contents. func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { return this.AuthorizedAsk(hash, "", "") } +// Determine if a block with the given hash is available and readable with the +// given signature and timestamp, but does not return the block contents. func (this KeepClient) AuthorizedAsk(hash string, signature string, timestamp string) (contentLength int64, url string, err error) { // Calculate the ordering for asking servers - sv := this.ShuffledServiceRoots(hash) + sv := this.shuffledServiceRoots(hash) for _, host := range sv { var req *http.Request diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index 5f189fc8aa..348b913fd5 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -19,7 +19,9 @@ import ( ) // Gocheck boilerplate -func Test(t *testing.T) { TestingT(t) } +func Test(t *testing.T) { + TestingT(t) +} // Gocheck boilerplate var _ = Suite(&ServerRequiredSuite{}) @@ -60,21 +62,22 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { os.Setenv("ARVADOS_API_HOST_INSECURE", "") kc, err := MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, false) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, false) os.Setenv("ARVADOS_API_HOST_INSECURE", "true") kc, err = MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, true) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, true) + c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true) c.Assert(err, Equals, nil) - c.Assert(len(kc.Service_roots), Equals, 2) - c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107") - c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108") + c.Check(len(kc.Service_roots), Equals, 2) + c.Check(kc.Service_roots[0], Equals, "http://localhost:25107") + c.Check(kc.Service_roots[1], Equals, "http://localhost:25108") } func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { @@ -82,11 +85,11 @@ func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { // "foo" acbd18db4cc2f85cedef654fccc4a4d8 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"} - c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) + c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) // "bar" 37b51d194a7513e45b56f6524f2d51f2 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"} - c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) + c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) } type StubPutHandler struct { @@ -123,7 +126,7 @@ func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url s } func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string, - io.ReadCloser, io.WriteCloser, chan UploadStatus)) { + io.ReadCloser, io.WriteCloser, chan uploadStatus)) { listener, url := RunBogusKeepServer(st, 2990) defer listener.Close() @@ -132,7 +135,7 @@ func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string, kc.ApiToken = "abc123" reader, writer := io.Pipe() - upload_status := make(chan UploadStatus) + upload_status := make(chan uploadStatus) f(kc, url, reader, writer, upload_status) } @@ -147,7 +150,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { UploadToStubHelper(c, st, func(kc KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { + writer io.WriteCloser, upload_status chan uploadStatus) { go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo"))) @@ -156,7 +159,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { <-st.handled status := <-upload_status - c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) }) } @@ -170,34 +173,23 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { UploadToStubHelper(c, st, func(kc KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { - - // Buffer for reads from 'r' - buf := make([]byte, 512) - - // Read requests on Transfer() buffer - requests := make(chan buffer.ReadRequest) - defer close(requests) + writer io.WriteCloser, upload_status chan uploadStatus) { - // Reporting reader error states - reader_status := make(chan error) + tr := buffer.StartTransferFromReader(512, reader) + defer tr.Close() - go buffer.Transfer(buf, reader, requests, reader_status) - - br1 := buffer.MakeBufferReader(requests) + br1 := tr.MakeBufferReader() go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3) writer.Write([]byte("foo")) writer.Close() - <-reader_status + <-tr.Reader_status <-st.handled status := <-upload_status - c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) - - //c.Check(true, Equals, false) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) }) } @@ -206,7 +198,7 @@ type FailHandler struct { } func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - resp.WriteHeader(400) + resp.WriteHeader(500) this.handled <- fmt.Sprintf("http://%s", req.Host) } @@ -218,7 +210,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { UploadToStubHelper(c, st, func(kc KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { + writer io.WriteCloser, upload_status chan uploadStatus) { go kc.uploadToKeepServer(url, hash, reader, upload_status, 3) @@ -229,7 +221,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { status := <-upload_status c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash)) - c.Check(status.StatusCode, Equals, 400) + c.Check(status.StatusCode, Equals, 500) }) } @@ -279,10 +271,14 @@ func (s *StandaloneSuite) TestPutB(c *C) { kc.PutB([]byte("foo")) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) - c.Check(<-st.handled, Equals, shuff[0]) - c.Check(<-st.handled, Equals, shuff[1]) + s1 := <-st.handled + s2 := <-st.handled + c.Check((s1 == shuff[0] && s2 == shuff[1]) || + (s1 == shuff[1] && s2 == shuff[0]), + Equals, + true) } func (s *StandaloneSuite) TestPutHR(c *C) { @@ -321,7 +317,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.PutHR(hash, reader, 3) - shuff := kc.ShuffledServiceRoots(hash) + shuff := kc.shuffledServiceRoots(hash) log.Print(shuff) s1 := <-st.handled @@ -368,7 +364,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { sort.Strings(kc.Service_roots) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) phash, replicas, err := kc.PutB([]byte("foo")) @@ -416,7 +412,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { sort.Strings(kc.Service_roots) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) _, replicas, err := kc.PutB([]byte("foo")) @@ -485,6 +481,43 @@ func (s *StandaloneSuite) TestGetFail(c *C) { c.Check(r, Equals, nil) } +type BarHandler struct { + handled chan string +} + +func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.Write([]byte("bar")) + this.handled <- fmt.Sprintf("http://%s", req.Host) +} + +func (s *StandaloneSuite) TestChecksum(c *C) { + foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar"))) + + st := BarHandler{make(chan string, 1)} + + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() + + kc, _ := MakeKeepClient() + kc.ApiToken = "abc123" + kc.Service_roots = []string{url} + + r, n, _, err := kc.Get(barhash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, nil) + + <-st.handled + + r, n, _, err = kc.Get(foohash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, BadChecksum) + + <-st.handled +} + func (s *StandaloneSuite) TestGetWithFailures(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go new file mode 100644 index 0000000000..7ea8248b39 --- /dev/null +++ b/sdk/go/src/arvados.org/keepclient/support.go @@ -0,0 +1,221 @@ +/* Internal methods to support keepclient.go */ +package keepclient + +import ( + "arvados.org/buffer" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "sort" + "strconv" +) + +type keepDisk struct { + Hostname string `json:"service_host"` + Port int `json:"service_port"` + SSL bool `json:"service_ssl_flag"` +} + +func (this *KeepClient) discoverKeepServers() error { + // Construct request of keep disk list + var req *http.Request + var err error + if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil { + return err + } + + // Add api token header + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + // Make the request + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + return err + } + + type svcList struct { + Items []keepDisk `json:"items"` + } + + // Decode json reply + dec := json.NewDecoder(resp.Body) + var m svcList + if err := dec.Decode(&m); err != nil { + return err + } + + listed := make(map[string]bool) + this.Service_roots = make([]string, 0, len(m.Items)) + + for _, element := range m.Items { + n := "" + if element.SSL { + n = "s" + } + + // Construct server URL + url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port) + + // Skip duplicates + if !listed[url] { + listed[url] = true + this.Service_roots = append(this.Service_roots, url) + } + } + + // Must be sorted for ShuffledServiceRoots() to produce consistent + // results. + sort.Strings(this.Service_roots) + + return nil +} + +func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) { + // Build an ordering with which to query the Keep servers based on the + // contents of the hash. "hash" is a hex-encoded number at least 8 + // digits (32 bits) long + + // seed used to calculate the next keep server from 'pool' to be added + // to 'pseq' + seed := hash + + // Keep servers still to be added to the ordering + pool := make([]string, len(this.Service_roots)) + copy(pool, this.Service_roots) + + // output probe sequence + pseq = make([]string, 0, len(this.Service_roots)) + + // iterate while there are servers left to be assigned + for len(pool) > 0 { + + if len(seed) < 8 { + // ran out of digits in the seed + if len(pseq) < (len(hash) / 4) { + // the number of servers added to the probe + // sequence is less than the number of 4-digit + // slices in 'hash' so refill the seed with the + // last 4 digits. + seed = hash[len(hash)-4:] + } + seed += hash + } + + // Take the next 8 digits (32 bytes) and interpret as an integer, + // then modulus with the size of the remaining pool to get the next + // selected server. + probe, _ := strconv.ParseUint(seed[0:8], 16, 32) + probe %= uint64(len(pool)) + + // Append the selected server to the probe sequence and remove it + // from the pool. + pseq = append(pseq, pool[probe]) + pool = append(pool[:probe], pool[probe+1:]...) + + // Remove the digits just used from the seed + seed = seed[8:] + } + return pseq +} + +type uploadStatus struct { + Err error + Url string + StatusCode int +} + +func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, + upload_status chan<- uploadStatus, expectedLength int64) { + + log.Printf("Uploading to %s", host) + + var req *http.Request + var err error + var url = fmt.Sprintf("%s/%s", host, hash) + if req, err = http.NewRequest("PUT", url, nil); err != nil { + upload_status <- uploadStatus{err, url, 0} + return + } + + if expectedLength > 0 { + req.ContentLength = expectedLength + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + req.Header.Add("Content-Type", "application/octet-stream") + req.Body = body + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + upload_status <- uploadStatus{err, url, 0} + return + } + + if resp.StatusCode == http.StatusOK { + upload_status <- uploadStatus{nil, url, resp.StatusCode} + } else { + upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode} + } +} + +func (this KeepClient) putReplicas( + hash string, + tr buffer.TransferBuffer, + expectedLength int64) (replicas int, err error) { + + // Calculate the ordering for uploading to servers + sv := this.shuffledServiceRoots(hash) + + // The next server to try contacting + next_server := 0 + + // The number of active writers + active := 0 + + // Used to communicate status from the upload goroutines + upload_status := make(chan uploadStatus) + defer close(upload_status) + + // Desired number of replicas + remaining_replicas := this.Want_replicas + + for remaining_replicas > 0 { + for active < remaining_replicas { + // Start some upload requests + if next_server < len(sv) { + go this.uploadToKeepServer(sv[next_server], hash, tr.MakeBufferReader(), upload_status, expectedLength) + next_server += 1 + active += 1 + } else { + return (this.Want_replicas - remaining_replicas), InsufficientReplicasError + } + } + + // Now wait for something to happen. + select { + case status := <-tr.Reader_status: + if status == io.EOF { + // good news! + } else { + // bad news + return (this.Want_replicas - remaining_replicas), status + } + case status := <-upload_status: + if status.StatusCode == 200 { + // good news! + remaining_replicas -= 1 + } else { + // writing to keep server failed for some reason + log.Printf("Keep server put to %v failed with '%v'", + status.Url, status.Err) + } + active -= 1 + log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active) + } + } + + return (this.Want_replicas - remaining_replicas), nil +} -- 2.30.2