5824: Add some clarifying comments and golint/vet/fmt fixes.
authorTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 18:47:16 +0000 (14:47 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 20:11:25 +0000 (16:11 -0400)
sdk/go/keepclient/collectionreader.go
sdk/go/keepclient/collectionreader_test.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/support.go
services/crunchstat/crunchstat_test.go
services/datamanager/keep/keep.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go

index 0d05b8a00ebd74c2f12538c55c89e9e3312e5045..68ecc6e43083d628fda84b00fbfeaf7fbfc9fe6b 100644 (file)
@@ -8,6 +8,13 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
+// ReadCloserWithLen extends io.ReadCloser with a Len() method that
+// returns the total number of bytes available to read.
+type ReadCloserWithLen interface {
+       io.ReadCloser
+       Len() uint64
+}
+
 const (
        // After reading a data block from Keep, cfReader slices it up
        // and sends the slices to a buffered channel to be consumed
@@ -24,71 +31,76 @@ const (
 // parameter when retrieving the collection record).
 var ErrNoManifest = errors.New("Collection has no manifest")
 
-// CollectionFileReader returns an io.Reader that reads file content
-// from a collection. The filename must be given relative to the root
-// of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (*cfReader, error) {
+// CollectionFileReader returns a ReadCloserWithLen that reads file
+// content from a collection. The filename must be given relative to
+// the root of the collection, without a leading "./".
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
        mText, ok := collection["manifest_text"].(string)
        if !ok {
                return nil, ErrNoManifest
        }
        m := manifest.Manifest{Text: mText}
        rdrChan := make(chan *cfReader)
-       go func() {
-               // q is a queue of FileSegments that we have received but
-               // haven't yet been able to send to toGet.
-               var q []*manifest.FileSegment
-               var r *cfReader
-               for seg := range m.FileSegmentIterByName(filename) {
-                       if r == nil {
-                               // We've just discovered that the
-                               // requested filename does appear in
-                               // the manifest, so we can return a
-                               // real reader (not nil) from
-                               // CollectionFileReader().
-                               r = newCFReader(kc)
-                               rdrChan <- r
-                       }
-                       q = append(q, seg)
-                       r.totalSize += uint64(seg.Len)
-                       // Send toGet as many segments as we can until
-                       // it blocks.
-               Q:
-                       for len(q) > 0 {
-                               select {
-                               case r.toGet <- q[0]:
-                                       q = q[1:]
-                               default:
-                                       break Q
-                               }
-                       }
-               }
+       go kc.queueSegmentsToGet(m, filename, rdrChan)
+       r, ok := <-rdrChan
+       if !ok {
+               return nil, os.ErrNotExist
+       }
+       return r, nil
+}
+
+// Send segments for the specified file to r.toGet. Send a *cfReader
+// to rdrChan if the specified file is found (even if it's empty).
+// Then, close rdrChan.
+func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
+       defer close(rdrChan)
+
+       // q is a queue of FileSegments that we have received but
+       // haven't yet been able to send to toGet.
+       var q []*manifest.FileSegment
+       var r *cfReader
+       for seg := range m.FileSegmentIterByName(filename) {
                if r == nil {
-                       // File not found
-                       rdrChan <- nil
-                       return
+                       // We've just discovered that the requested
+                       // filename does appear in the manifest, so we
+                       // can return a real reader (not nil) from
+                       // CollectionFileReader().
+                       r = newCFReader(kc)
+                       rdrChan <- r
                }
-               close(r.countDone)
-               for _, seg := range q {
-                       r.toGet <- seg
+               q = append(q, seg)
+               r.totalSize += uint64(seg.Len)
+               // Send toGet as many segments as we can until it
+               // blocks.
+       Q:
+               for len(q) > 0 {
+                       select {
+                       case r.toGet <- q[0]:
+                               q = q[1:]
+                       default:
+                               break Q
+                       }
                }
-               close(r.toGet)
-       }()
-       // Before returning a reader, wait until we know whether the
-       // file exists here:
-       r := <-rdrChan
+       }
        if r == nil {
-               return nil, os.ErrNotExist
+               // File not found.
+               return
        }
-       return r, nil
+       close(r.countDone)
+       for _, seg := range q {
+               r.toGet <- seg
+       }
+       close(r.toGet)
 }
 
 type cfReader struct {
        keepClient *KeepClient
+
        // doGet() reads FileSegments from toGet, gets the data from
        // Keep, and sends byte slices to toRead to be consumed by
        // Read().
        toGet chan *manifest.FileSegment
+
        // toRead is a buffered channel, sized to fit one full Keep
        // block. This lets us verify checksums without having a
        // store-and-forward delay between blocks: by the time the
@@ -96,17 +108,22 @@ type cfReader struct {
        // starting to fetch block N+1. A larger buffer would be
        // useful for a caller whose read speed varies a lot.
        toRead chan []byte
+
        // bytes ready to send next time someone calls Read()
        buf []byte
+
        // Total size of the file being read. Not safe to read this
        // until countDone is closed.
        totalSize uint64
        countDone chan struct{}
+
        // First error encountered.
        err error
+
        // errNotNil is closed IFF err contains a non-nil error.
        // Receiving from it will block until an error occurs.
        errNotNil chan struct{}
+
        // rdrClosed is closed IFF the reader's Close() method has
        // been called. Any goroutines associated with the reader will
        // stop and free up resources when they notice this channel is
@@ -116,31 +133,49 @@ type cfReader struct {
 
 func (r *cfReader) Read(outbuf []byte) (int, error) {
        if r.Error() != nil {
+               // Short circuit: the caller might as well find out
+               // now that we hit an error, even if there's buffered
+               // data we could return.
                return 0, r.Error()
        }
-       for r.buf == nil || len(r.buf) == 0 {
+       for len(r.buf) == 0 {
+               // Private buffer was emptied out by the last Read()
+               // (or this is the first Read() and r.buf is nil).
+               // Read from r.toRead until we get a non-empty slice
+               // or hit an error.
                var ok bool
                r.buf, ok = <-r.toRead
                if r.Error() != nil {
+                       // Error encountered while waiting for bytes
                        return 0, r.Error()
                } else if !ok {
+                       // No more bytes to read, no error encountered
                        return 0, io.EOF
                }
        }
+       // Copy as much as possible from our private buffer to the
+       // caller's buffer
        n := len(r.buf)
        if len(r.buf) > len(outbuf) {
                n = len(outbuf)
        }
        copy(outbuf[:n], r.buf[:n])
+
+       // Next call to Read() will continue where we left off
        r.buf = r.buf[n:]
+
        return n, nil
 }
 
+// Close releases resources. It returns a non-nil error if an error
+// was encountered by the reader.
 func (r *cfReader) Close() error {
        close(r.rdrClosed)
        return r.Error()
 }
 
+// Error returns an error if one has been encountered, otherwise
+// nil. It is safe to call from any goroutine.
 func (r *cfReader) Error() error {
        select {
        case <-r.errNotNil:
@@ -150,6 +185,8 @@ func (r *cfReader) Error() error {
        }
 }
 
+// Len returns the total number of bytes in the file being read. If
+// necessary, it waits for manifest parsing to finish.
 func (r *cfReader) Len() uint64 {
        // Wait for all segments to be counted
        <-r.countDone
index 94e41e2bc2d69e50f4e10502aae75c22ef5574b8..9fb0d86114f8ce6dd273e560c227856dde3d7c25 100644 (file)
@@ -36,7 +36,7 @@ func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
        s.handler = SuccessHandler{
                disk: make(map[string][]byte),
                lock: make(chan struct{}, 1),
-               ops: new(int),
+               ops:  new(int),
        }
        localRoots := make(map[string]string)
        for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
@@ -47,8 +47,8 @@ func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
 
 type SuccessHandler struct {
        disk map[string][]byte
-       lock chan struct{}      // channel with buffer==1: full when an operation is in progress.
-       ops  *int               // number of operations completed
+       lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+       ops  *int          // number of operations completed
 }
 
 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -65,7 +65,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                resp.Write([]byte(pdh))
        case "GET":
                pdh := req.URL.Path[1:]
@@ -74,7 +74,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                if !ok {
                        resp.WriteHeader(http.StatusNotFound)
                } else {
@@ -192,7 +192,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        }()
        err = rdr.Close()
        c.Assert(err, check.IsNil)
-       c.Assert(rdr.Error(), check.IsNil)
+       c.Assert(rdr.(*cfReader).Error(), check.IsNil)
 
        // Release the stub server's lock. The first GET operation will proceed.
        <-s.handler.lock
@@ -202,7 +202,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        <-firstReadDone
 
        // doGet() should close toRead before sending any more bufs to it.
-       if what, ok := <-rdr.toRead; ok {
+       if what, ok := <-rdr.(*cfReader).toRead; ok {
                c.Errorf("Got %q, expected toRead to be closed", string(what))
        }
 
@@ -217,7 +217,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
        c.Check(err, check.IsNil)
        for i := 0; i < 2; i++ {
                _, err = io.ReadFull(rdr, buf)
-               c.Check(err, check.Not(check.IsNil))
+               c.Check(err, check.NotNil)
                c.Check(err, check.Not(check.Equals), io.EOF)
        }
 }
index 67c304deaf3ae54b2668cb8c2f2856e909da8c5a..e74ae2cf08da0401bab6b808cf7333af05ef8a36 100644 (file)
@@ -169,7 +169,7 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                                retryList = append(retryList, host)
                        } else if resp.StatusCode != http.StatusOK {
                                var respbody []byte
-                               respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                               respbody, _ = ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
                                resp.Body.Close()
                                errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
                                        url, resp.StatusCode, bytes.TrimSpace(respbody)))
index 0791d3cf856ee7d5d1268338eafa883fe9bcbb18..f5554618feece00d04c34982347ffcc8e1e405aa 100644 (file)
@@ -169,7 +169,7 @@ type uploadStatus struct {
        response        string
 }
 
-func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
        upload_status chan<- uploadStatus, expectedLength int64, requestId string) {
 
        var req *http.Request
@@ -214,7 +214,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        defer resp.Body.Close()
        defer io.Copy(ioutil.Discard, resp.Body)
 
-       respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+       respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
        response := strings.TrimSpace(string(respbody))
        if err2 != nil && err2 != io.EOF {
                log.Printf("[%v] Upload %v error: %v response: %v", requestId, url, err2.Error(), response)
@@ -228,7 +228,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        }
 }
 
-func (this KeepClient) putReplicas(
+func (this *KeepClient) putReplicas(
        hash string,
        tr *streamer.AsyncStream,
        expectedLength int64) (locator string, replicas int, err error) {
index 13f4dc60f71db9567e26f534c2379a63c601289b..69f31afbc9589ce6cd6c9de2a731d5093e2c80cd 100644 (file)
@@ -101,7 +101,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
        }
 
        if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
-               t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
+               t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
        }
 
        select {
index 86c2b089aa13088d8da8f524ab21dcc8dafc9641..3a9c21a43855a472c4f5b43aa9b651fd85f506d4 100644 (file)
@@ -23,7 +23,7 @@ import (
 
 // ServerAddress struct
 type ServerAddress struct {
-       SSL         bool   `json:service_ssl_flag`
+       SSL         bool   `json:"service_ssl_flag"`
        Host        string `json:"service_host"`
        Port        int    `json:"service_port"`
        UUID        string `json:"uuid"`
index 9df78becbe75dd4d37902028d0d4082c46cff6a6..2704263e7172f09f449ec6c6558b9f6740119023 100644 (file)
@@ -304,7 +304,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        w.Header().Set("Content-Type", t)
                }
        }
-       w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
+       if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
+               w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
+       }
        if attachment {
                w.Header().Set("Content-Disposition", "attachment")
        }
index f877ee1e9d0ba1eed3eb256a36dbb1963ac6a0e7..392de94ffb5af1399bada756ed4a8efc7f33506b 100644 (file)
@@ -294,7 +294,7 @@ func (s *IntegrationSuite) TestVhostRedirectPOSTFormTokenToCookie404(c *check.C)
 func (s *IntegrationSuite) TestAnonymousTokenOK(c *check.C) {
        anonymousTokens = []string{arvadostest.AnonymousToken}
        s.testVhostRedirectTokenToCookie(c, "GET",
-               "example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt",
+               "example.com/c="+arvadostest.HelloWorldCollection+"/Hello%20world.txt",
                "",
                "",
                "",
@@ -306,7 +306,7 @@ func (s *IntegrationSuite) TestAnonymousTokenOK(c *check.C) {
 func (s *IntegrationSuite) TestAnonymousTokenError(c *check.C) {
        anonymousTokens = []string{"anonymousTokenConfiguredButInvalid"}
        s.testVhostRedirectTokenToCookie(c, "GET",
-               "example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt",
+               "example.com/c="+arvadostest.HelloWorldCollection+"/Hello%20world.txt",
                "",
                "",
                "",
index 8e734f7110c65f9f28c03525c63a2c5176aafbcd..f7677ec5fed3c28e8bab6c62fd452b0ce687ae9a 100644 (file)
@@ -201,7 +201,7 @@ func GetRemoteAddress(req *http.Request) string {
        return req.RemoteAddr
 }
 
-func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
+func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
        var auth string
        if auth = req.Header.Get("Authorization"); auth == "" {
                return false, ""
@@ -331,7 +331,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        var pass bool
        var tok string
-       if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+       if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                status, err = http.StatusForbidden, BadAuthorizationHeader
                return
        }
@@ -432,7 +432,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        var pass bool
        var tok string
-       if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+       if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                err = BadAuthorizationHeader
                status = http.StatusForbidden
                return
@@ -515,7 +515,7 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        kc := *handler.KeepClient
 
-       ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
+       ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
        if !ok {
                status, err = http.StatusForbidden, BadAuthorizationHeader
                return
index 6fe8fe7ac3d0023c4d012be022c9ab3d50431d62..917f0124adbec7a10305e700ea1bd76ea2d39f75 100644 (file)
@@ -103,7 +103,7 @@ func setupProxyService() {
        }
 }
 
-func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.KeepClient {
+func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
        if bogusClientToken {
                os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
        }
@@ -138,7 +138,7 @@ func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.K
                go main()
        }
 
-       return kc
+       return &kc
 }
 
 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {