19923: Log progress while reading keepstore indexes.
authorTom Clegg <tom@curii.com>
Thu, 26 Jan 2023 16:14:28 +0000 (11:14 -0500)
committerTom Clegg <tom@curii.com>
Thu, 26 Jan 2023 16:14:28 +0000 (11:14 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/keep_service.go

index 88728d95661a6a60355190cecc23c3adf3a77961..5b6d71a4fb73953109b884156d39e6143c92e4a4 100644 (file)
@@ -12,7 +12,10 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
+
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 // KeepService is an arvados#keepService record
@@ -164,10 +167,30 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
        }
        defer resp.Body.Close()
 
+       var progress int64
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       go func() {
+               log := ctxlog.FromContext(ctx)
+               logticker := time.NewTicker(5 * time.Minute)
+               defer logticker.Stop()
+               for {
+                       select {
+                       case <-logticker.C:
+                               log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
        var entries []KeepServiceIndexEntry
        scanner := bufio.NewScanner(resp.Body)
        sawEOF := false
        for scanner.Scan() {
+               if ctx.Err() != nil {
+                       return nil, ctx.Err()
+               }
                if scanner.Err() != nil {
                        // If we encounter a read error (timeout,
                        // connection failure), stop now and return it
@@ -206,6 +229,7 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
                        SizedDigest: SizedDigest(fields[0]),
                        Mtime:       mtime,
                })
+               atomic.AddInt64(&progress, 1)
        }
        if err := scanner.Err(); err != nil {
                return nil, fmt.Errorf("Error scanning index response: %v", err)