Merge commit '3b735dd9330e0989f51a76771c3303031154154e' into 21158-wf-page-list
[arvados.git] / sdk / go / arvados / keep_service.go
index 88728d95661a6a60355190cecc23c3adf3a77961..85750d8cfc97d8530794c6c4dfdbfc78aa258bb4 100644 (file)
@@ -12,7 +12,10 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
+
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 // KeepService is an arvados#keepService record
@@ -30,7 +33,8 @@ type KeepService struct {
 type KeepMount struct {
        UUID           string          `json:"uuid"`
        DeviceID       string          `json:"device_id"`
-       ReadOnly       bool            `json:"read_only"`
+       AllowWrite     bool            `json:"allow_write"`
+       AllowTrash     bool            `json:"allow_trash"`
        Replication    int             `json:"replication"`
        StorageClasses map[string]bool `json:"storage_classes"`
 }
@@ -164,10 +168,30 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
        }
        defer resp.Body.Close()
 
+       var progress int64
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       go func() {
+               log := ctxlog.FromContext(ctx)
+               logticker := time.NewTicker(5 * time.Minute)
+               defer logticker.Stop()
+               for {
+                       select {
+                       case <-logticker.C:
+                               log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
        var entries []KeepServiceIndexEntry
        scanner := bufio.NewScanner(resp.Body)
        sawEOF := false
        for scanner.Scan() {
+               if ctx.Err() != nil {
+                       return nil, ctx.Err()
+               }
                if scanner.Err() != nil {
                        // If we encounter a read error (timeout,
                        // connection failure), stop now and return it
@@ -206,6 +230,7 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
                        SizedDigest: SizedDigest(fields[0]),
                        Mtime:       mtime,
                })
+               atomic.AddInt64(&progress, 1)
        }
        if err := scanner.Err(); err != nil {
                return nil, fmt.Errorf("Error scanning index response: %v", err)