"net/http"
"strconv"
"strings"
+ "sync/atomic"
"time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
)
// KeepService is an arvados#keepService record
type KeepMount struct {
UUID string `json:"uuid"`
DeviceID string `json:"device_id"`
- ReadOnly bool `json:"read_only"`
+ AllowWrite bool `json:"allow_write"`
+ AllowTrash bool `json:"allow_trash"`
Replication int `json:"replication"`
StorageClasses map[string]bool `json:"storage_classes"`
}
return nil
}
-// Index returns an unsorted list of blocks at the given mount point.
+// IndexMount returns an unsorted list of blocks at the given mount point.
func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+ return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(ctx, c, s.url("index/"+prefix))
+ return s.index(ctx, c, prefix, s.url("index/"+prefix))
}
-func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
defer resp.Body.Close()
+ var progress int64
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ go func() {
+ log := ctxlog.FromContext(ctx)
+ logticker := time.NewTicker(5 * time.Minute)
+ defer logticker.Stop()
+ for {
+ select {
+ case <-logticker.C:
+ log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
var entries []KeepServiceIndexEntry
scanner := bufio.NewScanner(resp.Body)
sawEOF := false
for scanner.Scan() {
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
if scanner.Err() != nil {
// If we encounter a read error (timeout,
// connection failure), stop now and return it
if len(fields) != 2 {
return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
}
+ if !strings.HasPrefix(fields[0], prefix) {
+ return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+ }
mtime, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
SizedDigest: SizedDigest(fields[0]),
Mtime: mtime,
})
+ atomic.AddInt64(&progress, 1)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("Error scanning index response: %v", err)