"git.curoverse.com/arvados.git/sdk/go/manifest"
)
+// ReadCloserWithLen extends io.ReadCloser with a Len() method that
+// returns the total number of bytes available to read.
+type ReadCloserWithLen interface {
+ io.ReadCloser
+ Len() uint64
+}
+
const (
// After reading a data block from Keep, cfReader slices it up
// and sends the slices to a buffered channel to be consumed
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")
-// CollectionFileReader returns an io.Reader that reads file content
-// from a collection. The filename must be given relative to the root
-// of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) {
+// CollectionFileReader returns a ReadCloserWithLen that reads file
+// content from a collection. The filename must be given relative to
+// the root of the collection, without a leading "./".
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
mText, ok := collection["manifest_text"].(string)
if !ok {
return nil, ErrNoManifest
}
m := manifest.Manifest{Text: mText}
rdrChan := make(chan *cfReader)
- go func() {
- // q is a queue of FileSegments that we have received but
- // haven't yet been able to send to toGet.
- var q []*manifest.FileSegment
- var r *cfReader
- for seg := range m.FileSegmentIterByName(filename) {
- if r == nil {
- // We've just discovered that the
- // requested filename does appear in
- // the manifest, so we can return a
- // real reader (not nil) from
- // CollectionFileReader().
- r = newCFReader(kc)
- rdrChan <- r
- }
- q = append(q, seg)
- r.totalSize += uint64(seg.Len)
- // Send toGet as many segments as we can until
- // it blocks.
- Q:
- for len(q) > 0 {
- select {
- case r.toGet <- q[0]:
- q = q[1:]
- default:
- break Q
- }
- }
- }
+ go kc.queueSegmentsToGet(m, filename, rdrChan)
+ r, ok := <-rdrChan
+ if !ok {
+ return nil, os.ErrNotExist
+ }
+ return r, nil
+}
+
+// Send segments for the specified file to r.toGet. Send a *cfReader
+// to rdrChan if the specified file is found (even if it's empty).
+// Then, close rdrChan.
+func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
+ defer close(rdrChan)
+
+ // q is a queue of FileSegments that we have received but
+ // haven't yet been able to send to toGet.
+ var q []*manifest.FileSegment
+ var r *cfReader
+ for seg := range m.FileSegmentIterByName(filename) {
if r == nil {
- // File not found
- rdrChan <- nil
- return
+ // We've just discovered that the requested
+ // filename does appear in the manifest, so we
+ // can return a real reader (not nil) from
+ // CollectionFileReader().
+ r = newCFReader(kc)
+ rdrChan <- r
}
- close(r.countDone)
- for _, seg := range q {
- r.toGet <- seg
+ q = append(q, seg)
+ r.totalSize += uint64(seg.Len)
+ // Send toGet as many segments as we can until it
+ // blocks.
+ Q:
+ for len(q) > 0 {
+ select {
+ case r.toGet <- q[0]:
+ q = q[1:]
+ default:
+ break Q
+ }
}
- close(r.toGet)
- }()
- // Before returning a reader, wait until we know whether the
- // file exists here:
- r := <-rdrChan
+ }
if r == nil {
- return nil, os.ErrNotExist
+ // File not found.
+ return
}
- return r, nil
+ close(r.countDone)
+ for _, seg := range q {
+ r.toGet <- seg
+ }
+ close(r.toGet)
}
type cfReader struct {
keepClient *KeepClient
+
// doGet() reads FileSegments from toGet, gets the data from
// Keep, and sends byte slices to toRead to be consumed by
// Read().
toGet chan *manifest.FileSegment
+
// toRead is a buffered channel, sized to fit one full Keep
// block. This lets us verify checksums without having a
// store-and-forward delay between blocks: by the time the
// starting to fetch block N+1. A larger buffer would be
// useful for a caller whose read speed varies a lot.
toRead chan []byte
+
// bytes ready to send next time someone calls Read()
buf []byte
+
// Total size of the file being read. Not safe to read this
// until countDone is closed.
totalSize uint64
countDone chan struct{}
+
// First error encountered.
err error
+
// errNotNil is closed IFF err contains a non-nil error.
// Receiving from it will block until an error occurs.
errNotNil chan struct{}
+
// rdrClosed is closed IFF the reader's Close() method has
// been called. Any goroutines associated with the reader will
// stop and free up resources when they notice this channel is
func (r *cfReader) Read(outbuf []byte) (int, error) {
if r.Error() != nil {
+ // Short circuit: the caller might as well find out
+ // now that we hit an error, even if there's buffered
+ // data we could return.
return 0, r.Error()
}
- for r.buf == nil || len(r.buf) == 0 {
+ for len(r.buf) == 0 {
+ // Private buffer was emptied out by the last Read()
+ // (or this is the first Read() and r.buf is nil).
+ // Read from r.toRead until we get a non-empty slice
+ // or hit an error.
var ok bool
r.buf, ok = <-r.toRead
if r.Error() != nil {
+ // Error encountered while waiting for bytes
return 0, r.Error()
} else if !ok {
+ // No more bytes to read, no error encountered
return 0, io.EOF
}
}
+ // Copy as much as possible from our private buffer to the
+ // caller's buffer
n := len(r.buf)
if len(r.buf) > len(outbuf) {
n = len(outbuf)
}
copy(outbuf[:n], r.buf[:n])
+
+ // Next call to Read() will continue where we left off
r.buf = r.buf[n:]
+
return n, nil
}
+// Close releases resources. It returns a non-nil error if an error
+// was encountered by the reader.
func (r *cfReader) Close() error {
close(r.rdrClosed)
return r.Error()
}
+// Error returns an error if one has been encountered, otherwise
+// nil. It is safe to call from any goroutine.
func (r *cfReader) Error() error {
select {
case <-r.errNotNil:
}
}
+// Len returns the total number of bytes in the file being read. If
+// necessary, it waits for manifest parsing to finish.
func (r *cfReader) Len() uint64 {
// Wait for all segments to be counted
<-r.countDone
s.handler = SuccessHandler{
disk: make(map[string][]byte),
lock: make(chan struct{}, 1),
- ops: new(int),
+ ops: new(int),
}
localRoots := make(map[string]string)
for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
type SuccessHandler struct {
disk map[string][]byte
- lock chan struct{} // channel with buffer==1: full when an operation is in progress.
- ops *int // number of operations completed
+ lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+ ops *int // number of operations completed
}
func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if h.ops != nil {
(*h.ops)++
}
- <- h.lock
+ <-h.lock
resp.Write([]byte(pdh))
case "GET":
pdh := req.URL.Path[1:]
if h.ops != nil {
(*h.ops)++
}
- <- h.lock
+ <-h.lock
if !ok {
resp.WriteHeader(http.StatusNotFound)
} else {
}()
err = rdr.Close()
c.Assert(err, check.IsNil)
- c.Assert(rdr.Error(), check.IsNil)
+ c.Assert(rdr.(*cfReader).Error(), check.IsNil)
// Release the stub server's lock. The first GET operation will proceed.
<-s.handler.lock
<-firstReadDone
// doGet() should close toRead before sending any more bufs to it.
- if what, ok := <-rdr.toRead; ok {
+ if what, ok := <-rdr.(*cfReader).toRead; ok {
c.Errorf("Got %q, expected toRead to be closed", string(what))
}
c.Check(err, check.IsNil)
for i := 0; i < 2; i++ {
_, err = io.ReadFull(rdr, buf)
- c.Check(err, check.Not(check.IsNil))
+ c.Check(err, check.NotNil)
c.Check(err, check.Not(check.Equals), io.EOF)
}
}
retryList = append(retryList, host)
} else if resp.StatusCode != http.StatusOK {
var respbody []byte
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
resp.Body.Close()
errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
url, resp.StatusCode, bytes.TrimSpace(respbody)))
response string
}
-func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
upload_status chan<- uploadStatus, expectedLength int64, requestId string) {
var req *http.Request
defer resp.Body.Close()
defer io.Copy(ioutil.Discard, resp.Body)
- respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
response := strings.TrimSpace(string(respbody))
if err2 != nil && err2 != io.EOF {
log.Printf("[%v] Upload %v error: %v response: %v", requestId, url, err2.Error(), response)
}
}
-func (this KeepClient) putReplicas(
+func (this *KeepClient) putReplicas(
hash string,
tr *streamer.AsyncStream,
expectedLength int64) (locator string, replicas int, err error) {
}
if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
- t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
+ t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
}
select {
// ServerAddress struct
type ServerAddress struct {
- SSL bool `json:service_ssl_flag`
+ SSL bool `json:"service_ssl_flag"`
Host string `json:"service_host"`
Port int `json:"service_port"`
UUID string `json:"uuid"`
w.Header().Set("Content-Type", t)
}
}
- w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
+ if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
+ w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
+ }
if attachment {
w.Header().Set("Content-Disposition", "attachment")
}
func (s *IntegrationSuite) TestAnonymousTokenOK(c *check.C) {
anonymousTokens = []string{arvadostest.AnonymousToken}
s.testVhostRedirectTokenToCookie(c, "GET",
- "example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt",
+ "example.com/c="+arvadostest.HelloWorldCollection+"/Hello%20world.txt",
"",
"",
"",
func (s *IntegrationSuite) TestAnonymousTokenError(c *check.C) {
anonymousTokens = []string{"anonymousTokenConfiguredButInvalid"}
s.testVhostRedirectTokenToCookie(c, "GET",
- "example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt",
+ "example.com/c="+arvadostest.HelloWorldCollection+"/Hello%20world.txt",
"",
"",
"",
return req.RemoteAddr
}
-func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
+func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
var auth string
if auth = req.Header.Get("Authorization"); auth == "" {
return false, ""
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
err = BadAuthorizationHeader
status = http.StatusForbidden
return
kc := *handler.KeepClient
- ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
+ ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
if !ok {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
}
-func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.KeepClient {
+func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
if bogusClientToken {
os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
}
go main()
}
- return kc
+ return &kc
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {