X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f4aa4dbbefe8b6dd65e3a112642da288774cf951..8fbe20476e2d10b1fc8dac848b2a0ffdf488a082:/sdk/go/arvados/keep_service.go diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go index 0c866354aa..85750d8cfc 100644 --- a/sdk/go/arvados/keep_service.go +++ b/sdk/go/arvados/keep_service.go @@ -6,28 +6,37 @@ package arvados import ( "bufio" + "context" "fmt" + "io/ioutil" "net/http" "strconv" "strings" + "sync/atomic" + "time" + + "git.arvados.org/arvados.git/sdk/go/ctxlog" ) // KeepService is an arvados#keepService record type KeepService struct { - UUID string `json:"uuid"` - ServiceHost string `json:"service_host"` - ServicePort int `json:"service_port"` - ServiceSSLFlag bool `json:"service_ssl_flag"` - ServiceType string `json:"service_type"` - ReadOnly bool `json:"read_only"` + UUID string `json:"uuid"` + ServiceHost string `json:"service_host"` + ServicePort int `json:"service_port"` + ServiceSSLFlag bool `json:"service_ssl_flag"` + ServiceType string `json:"service_type"` + ReadOnly bool `json:"read_only"` + CreatedAt time.Time `json:"created_at"` + ModifiedAt time.Time `json:"modified_at"` } type KeepMount struct { - UUID string `json:"uuid"` - DeviceID string `json:"device_id"` - ReadOnly bool `json:"read_only"` - Replication int `json:"replication"` - StorageClasses []string `json:"storage_classes"` + UUID string `json:"uuid"` + DeviceID string `json:"device_id"` + AllowWrite bool `json:"allow_write"` + AllowTrash bool `json:"allow_trash"` + Replication int `json:"replication"` + StorageClasses map[string]bool `json:"storage_classes"` } // KeepServiceList is an arvados#keepServiceList record @@ -99,21 +108,57 @@ func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) { return mounts, nil } -// Index returns an unsorted list of blocks at the given mount point. -func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) { - return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix)) +// Touch updates the timestamp on the given block. +func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error { + req, err := http.NewRequest("TOUCH", s.url(blk), nil) + if err != nil { + return err + } + resp, err := c.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body) + } + return nil +} + +// Untrash moves/copies the given block out of trash. +func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error { + req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil) + if err != nil { + return err + } + resp, err := c.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body) + } + return nil +} + +// IndexMount returns an unsorted list of blocks at the given mount point. +func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) { + return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix)) } // Index returns an unsorted list of blocks that can be retrieved from // this server. -func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) { - return s.index(c, s.url("index/"+prefix)) +func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) { + return s.index(ctx, c, prefix, s.url("index/"+prefix)) } -func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) { - req, err := http.NewRequest("GET", url, nil) +func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return nil, fmt.Errorf("NewRequest(%v): %v", url, err) + return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err) } resp, err := c.Do(req) if err != nil { @@ -123,10 +168,30 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err } defer resp.Body.Close() + var progress int64 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + log := ctxlog.FromContext(ctx) + logticker := time.NewTicker(5 * time.Minute) + defer logticker.Stop() + for { + select { + case <-logticker.C: + log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url) + case <-ctx.Done(): + return + } + } + }() + var entries []KeepServiceIndexEntry scanner := bufio.NewScanner(resp.Body) sawEOF := false for scanner.Scan() { + if ctx.Err() != nil { + return nil, ctx.Err() + } if scanner.Err() != nil { // If we encounter a read error (timeout, // connection failure), stop now and return it @@ -146,6 +211,9 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err if len(fields) != 2 { return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields)) } + if !strings.HasPrefix(fields[0], prefix) { + return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix) + } mtime, err := strconv.ParseInt(fields[1], 10, 64) if err != nil { return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err) @@ -162,6 +230,7 @@ func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, err SizedDigest: SizedDigest(fields[0]), Mtime: mtime, }) + atomic.AddInt64(&progress, 1) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("Error scanning index response: %v", err)