+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package arvados
import (
"bufio"
+ "context"
"fmt"
+ "io/ioutil"
"net/http"
"strconv"
"strings"
+ "sync/atomic"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
)
// KeepService is an arvados#keepService record
type KeepService struct {
- UUID string `json:"uuid"`
- ServiceHost string `json:"service_host"`
- ServicePort int `json:"service_port"`
- ServiceSSLFlag bool `json:"service_ssl_flag"`
- ServiceType string `json:"service_type"`
- ReadOnly bool `json:"read_only"`
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+ CreatedAt time.Time `json:"created_at"`
+ ModifiedAt time.Time `json:"modified_at"`
+}
+
+type KeepMount struct {
+ UUID string `json:"uuid"`
+ DeviceID string `json:"device_id"`
+ AllowWrite bool `json:"allow_write"`
+ AllowTrash bool `json:"allow_trash"`
+ Replication int `json:"replication"`
+ StorageClasses map[string]bool `json:"storage_classes"`
}
// KeepServiceList is an arvados#keepServiceList record
return s.UUID
}
+func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
+ url := s.url("mounts")
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+ var mounts []KeepMount
+ err = c.DoAndDecode(&mounts, req)
+ if err != nil {
+ return nil, fmt.Errorf("GET %v: %v", url, err)
+ }
+ return mounts, nil
+}
+
+// Touch updates the timestamp on the given block.
+func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
+ req, err := http.NewRequest("TOUCH", s.url(blk), nil)
+ if err != nil {
+ return err
+ }
+ resp, err := c.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+ }
+ return nil
+}
+
+// Untrash moves/copies the given block out of trash.
+func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
+ req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
+ if err != nil {
+ return err
+ }
+ resp, err := c.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+ }
+ return nil
+}
+
+// IndexMount returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+}
+
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- url := s.url("index/" + prefix)
- req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, prefix, s.url("index/"+prefix))
+}
+
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
resp, err := c.Do(req)
if err != nil {
return nil, fmt.Errorf("Do(%v): %v", url, err)
} else if resp.StatusCode != 200 {
- return nil, fmt.Errorf("%v: %v", url, resp.Status)
+ return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
}
defer resp.Body.Close()
+ var progress int64
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ go func() {
+ log := ctxlog.FromContext(ctx)
+ logticker := time.NewTicker(5 * time.Minute)
+ defer logticker.Stop()
+ for {
+ select {
+ case <-logticker.C:
+ log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
var entries []KeepServiceIndexEntry
scanner := bufio.NewScanner(resp.Body)
sawEOF := false
for scanner.Scan() {
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
+ if scanner.Err() != nil {
+ // If we encounter a read error (timeout,
+ // connection failure), stop now and return it
+ // below, so it doesn't get masked by the
+ // ensuing "badly formatted response" error.
+ break
+ }
if sawEOF {
return nil, fmt.Errorf("Index response contained non-terminal blank line")
}
if len(fields) != 2 {
return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
}
+ if !strings.HasPrefix(fields[0], prefix) {
+ return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+ }
mtime, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
}
+ if mtime < 1e12 {
+ // An old version of keepstore is giving us
+ // timestamps in seconds instead of
+ // nanoseconds. (This threshold correctly
+ // handles all times between 1970-01-02 and
+ // 33658-09-27.)
+ mtime = mtime * 1e9
+ }
entries = append(entries, KeepServiceIndexEntry{
SizedDigest: SizedDigest(fields[0]),
Mtime: mtime,
})
+ atomic.AddInt64(&progress, 1)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("Error scanning index response: %v", err)