Merge branch '21696-slow-propfind'
[arvados.git] / sdk / go / arvados / keep_service.go
index 3af7479202efb46df838e48246ccf9c2f0b5b100..85750d8cfc97d8530794c6c4dfdbfc78aa258bb4 100644 (file)
@@ -12,7 +12,10 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
+
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 // KeepService is an arvados#keepService record
@@ -30,7 +33,8 @@ type KeepService struct {
 type KeepMount struct {
        UUID           string          `json:"uuid"`
        DeviceID       string          `json:"device_id"`
-       ReadOnly       bool            `json:"read_only"`
+       AllowWrite     bool            `json:"allow_write"`
+       AllowTrash     bool            `json:"allow_trash"`
        Replication    int             `json:"replication"`
        StorageClasses map[string]bool `json:"storage_classes"`
 }
@@ -140,21 +144,21 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
        return nil
 }
 
-// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+// IndexMount returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, prefix, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-       req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
+       req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
-               return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+               return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
        }
        resp, err := c.Do(req)
        if err != nil {
@@ -164,10 +168,30 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
        }
        defer resp.Body.Close()
 
+       var progress int64
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       go func() {
+               log := ctxlog.FromContext(ctx)
+               logticker := time.NewTicker(5 * time.Minute)
+               defer logticker.Stop()
+               for {
+                       select {
+                       case <-logticker.C:
+                               log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
        var entries []KeepServiceIndexEntry
        scanner := bufio.NewScanner(resp.Body)
        sawEOF := false
        for scanner.Scan() {
+               if ctx.Err() != nil {
+                       return nil, ctx.Err()
+               }
                if scanner.Err() != nil {
                        // If we encounter a read error (timeout,
                        // connection failure), stop now and return it
@@ -187,6 +211,9 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
                if len(fields) != 2 {
                        return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
                }
+               if !strings.HasPrefix(fields[0], prefix) {
+                       return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+               }
                mtime, err := strconv.ParseInt(fields[1], 10, 64)
                if err != nil {
                        return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
@@ -203,6 +230,7 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
                        SizedDigest: SizedDigest(fields[0]),
                        Mtime:       mtime,
                })
+               atomic.AddInt64(&progress, 1)
        }
        if err := scanner.Err(); err != nil {
                return nil, fmt.Errorf("Error scanning index response: %v", err)