Merge branch '21696-slow-propfind'
[arvados.git] / sdk / go / arvados / keep_service.go
index 97a62fa7bb3933b89e83e428fc4da39de7453fcd..85750d8cfc97d8530794c6c4dfdbfc78aa258bb4 100644 (file)
@@ -6,11 +6,16 @@ package arvados
 
 import (
        "bufio"
+       "context"
        "fmt"
+       "io/ioutil"
        "net/http"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
+
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 // KeepService is an arvados#keepService record
@@ -28,7 +33,8 @@ type KeepService struct {
 type KeepMount struct {
        UUID           string          `json:"uuid"`
        DeviceID       string          `json:"device_id"`
-       ReadOnly       bool            `json:"read_only"`
+       AllowWrite     bool            `json:"allow_write"`
+       AllowTrash     bool            `json:"allow_trash"`
        Replication    int             `json:"replication"`
        StorageClasses map[string]bool `json:"storage_classes"`
 }
@@ -102,21 +108,57 @@ func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
        return mounts, nil
 }
 
-// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+// Touch updates the timestamp on the given block.
+func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
+       req, err := http.NewRequest("TOUCH", s.url(blk), nil)
+       if err != nil {
+               return err
+       }
+       resp, err := c.Do(req.WithContext(ctx))
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               body, _ := ioutil.ReadAll(resp.Body)
+               return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+       }
+       return nil
+}
+
+// Untrash moves/copies the given block out of trash.
+func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
+       req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
+       if err != nil {
+               return err
+       }
+       resp, err := c.Do(req.WithContext(ctx))
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               body, _ := ioutil.ReadAll(resp.Body)
+               return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+       }
+       return nil
+}
+
+// IndexMount returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, prefix, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-       req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
+       req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
-               return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+               return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
        }
        resp, err := c.Do(req)
        if err != nil {
@@ -126,10 +168,30 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
        }
        defer resp.Body.Close()
 
+       var progress int64
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       go func() {
+               log := ctxlog.FromContext(ctx)
+               logticker := time.NewTicker(5 * time.Minute)
+               defer logticker.Stop()
+               for {
+                       select {
+                       case <-logticker.C:
+                               log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
        var entries []KeepServiceIndexEntry
        scanner := bufio.NewScanner(resp.Body)
        sawEOF := false
        for scanner.Scan() {
+               if ctx.Err() != nil {
+                       return nil, ctx.Err()
+               }
                if scanner.Err() != nil {
                        // If we encounter a read error (timeout,
                        // connection failure), stop now and return it
@@ -149,6 +211,9 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
                if len(fields) != 2 {
                        return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
                }
+               if !strings.HasPrefix(fields[0], prefix) {
+                       return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+               }
                mtime, err := strconv.ParseInt(fields[1], 10, 64)
                if err != nil {
                        return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
@@ -165,6 +230,7 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err
                        SizedDigest: SizedDigest(fields[0]),
                        Mtime:       mtime,
                })
+               atomic.AddInt64(&progress, 1)
        }
        if err := scanner.Err(); err != nil {
                return nil, fmt.Errorf("Error scanning index response: %v", err)