import (
"bufio"
+ "context"
"fmt"
+ "io/ioutil"
"net/http"
"strconv"
"strings"
+ "sync/atomic"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
)
// KeepService is an arvados#keepService record
type KeepService struct {
- UUID string `json:"uuid"`
- ServiceHost string `json:"service_host"`
- ServicePort int `json:"service_port"`
- ServiceSSLFlag bool `json:"service_ssl_flag"`
- ServiceType string `json:"service_type"`
- ReadOnly bool `json:"read_only"`
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+ CreatedAt time.Time `json:"created_at"`
+ ModifiedAt time.Time `json:"modified_at"`
}
type KeepMount struct {
- UUID string `json:"uuid"`
- DeviceID string `json:"device_id"`
- ReadOnly bool `json:"read_only"`
- Replication int `json:"replication"`
- StorageClasses []string `json:"storage_classes"`
+ UUID string `json:"uuid"`
+ DeviceID string `json:"device_id"`
+ AllowWrite bool `json:"allow_write"`
+ AllowTrash bool `json:"allow_trash"`
+ Replication int `json:"replication"`
+ StorageClasses map[string]bool `json:"storage_classes"`
}
// KeepServiceList is an arvados#keepServiceList record
return mounts, nil
}
-// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+// Touch updates the timestamp on the given block.
+func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
+ req, err := http.NewRequest("TOUCH", s.url(blk), nil)
+ if err != nil {
+ return err
+ }
+ resp, err := c.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+ }
+ return nil
+}
+
+// Untrash moves/copies the given block out of trash.
+func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
+ req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
+ if err != nil {
+ return err
+ }
+ resp, err := c.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+ }
+ return nil
+}
+
+// IndexMount returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, prefix, s.url("index/"+prefix))
}
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
- req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
resp, err := c.Do(req)
if err != nil {
}
defer resp.Body.Close()
+ var progress int64
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ go func() {
+ log := ctxlog.FromContext(ctx)
+ logticker := time.NewTicker(5 * time.Minute)
+ defer logticker.Stop()
+ for {
+ select {
+ case <-logticker.C:
+ log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
var entries []KeepServiceIndexEntry
scanner := bufio.NewScanner(resp.Body)
sawEOF := false
for scanner.Scan() {
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
if scanner.Err() != nil {
// If we encounter a read error (timeout,
// connection failure), stop now and return it
if len(fields) != 2 {
return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
}
+ if !strings.HasPrefix(fields[0], prefix) {
+ return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+ }
mtime, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
SizedDigest: SizedDigest(fields[0]),
Mtime: mtime,
})
+ atomic.AddInt64(&progress, 1)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("Error scanning index response: %v", err)