// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: Apache-2.0 package arvados import ( "bufio" "context" "fmt" "io/ioutil" "net/http" "strconv" "strings" "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/ctxlog" ) // KeepService is an arvados#keepService record type KeepService struct { UUID string `json:"uuid"` ServiceHost string `json:"service_host"` ServicePort int `json:"service_port"` ServiceSSLFlag bool `json:"service_ssl_flag"` ServiceType string `json:"service_type"` ReadOnly bool `json:"read_only"` CreatedAt time.Time `json:"created_at"` ModifiedAt time.Time `json:"modified_at"` } type KeepMount struct { UUID string `json:"uuid"` DeviceID string `json:"device_id"` AllowWrite bool `json:"allow_write"` AllowTrash bool `json:"allow_trash"` Replication int `json:"replication"` StorageClasses map[string]bool `json:"storage_classes"` } // KeepServiceList is an arvados#keepServiceList record type KeepServiceList struct { Items []KeepService `json:"items"` ItemsAvailable int `json:"items_available"` Offset int `json:"offset"` Limit int `json:"limit"` } // KeepServiceIndexEntry is what a keep service's index response tells // us about a stored block. type KeepServiceIndexEntry struct { SizedDigest // Time of last write, in nanoseconds since Unix epoch Mtime int64 } // EachKeepService calls f once for every readable // KeepService. EachKeepService stops if it encounters an // error, such as f returning a non-nil error. func (c *Client) EachKeepService(f func(KeepService) error) error { params := ResourceListParams{} for { var page KeepServiceList err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params) if err != nil { return err } for _, item := range page.Items { err = f(item) if err != nil { return err } } params.Offset = params.Offset + len(page.Items) if params.Offset >= page.ItemsAvailable { return nil } } } func (s *KeepService) url(path string) string { var f string if s.ServiceSSLFlag { f = "https://%s:%d/%s" } else { f = "http://%s:%d/%s" } return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path) } // String implements fmt.Stringer func (s *KeepService) String() string { return s.UUID } func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) { url := s.url("mounts") req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } var mounts []KeepMount err = c.DoAndDecode(&mounts, req) if err != nil { return nil, fmt.Errorf("GET %v: %v", url, err) } return mounts, nil } // Touch updates the timestamp on the given block. func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error { req, err := http.NewRequest("TOUCH", s.url(blk), nil) if err != nil { return err } resp, err := c.Do(req.WithContext(ctx)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body) } return nil } // Untrash moves/copies the given block out of trash. func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error { req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil) if err != nil { return err } resp, err := c.Do(req.WithContext(ctx)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body) } return nil } // IndexMount returns an unsorted list of blocks at the given mount point. func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) { return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix)) } // Index returns an unsorted list of blocks that can be retrieved from // this server. func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) { return s.index(ctx, c, prefix, s.url("index/"+prefix)) } func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err) } resp, err := c.Do(req) if err != nil { return nil, fmt.Errorf("Do(%v): %v", url, err) } else if resp.StatusCode != 200 { return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status) } defer resp.Body.Close() var progress int64 ctx, cancel := context.WithCancel(ctx) defer cancel() go func() { log := ctxlog.FromContext(ctx) logticker := time.NewTicker(5 * time.Minute) defer logticker.Stop() for { select { case <-logticker.C: log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url) case <-ctx.Done(): return } } }() var entries []KeepServiceIndexEntry scanner := bufio.NewScanner(resp.Body) sawEOF := false for scanner.Scan() { if ctx.Err() != nil { return nil, ctx.Err() } if scanner.Err() != nil { // If we encounter a read error (timeout, // connection failure), stop now and return it // below, so it doesn't get masked by the // ensuing "badly formatted response" error. break } if sawEOF { return nil, fmt.Errorf("Index response contained non-terminal blank line") } line := scanner.Text() if line == "" { sawEOF = true continue } fields := strings.Split(line, " ") if len(fields) != 2 { return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields)) } if !strings.HasPrefix(fields[0], prefix) { return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix) } mtime, err := strconv.ParseInt(fields[1], 10, 64) if err != nil { return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err) } if mtime < 1e12 { // An old version of keepstore is giving us // timestamps in seconds instead of // nanoseconds. (This threshold correctly // handles all times between 1970-01-02 and // 33658-09-27.) mtime = mtime * 1e9 } entries = append(entries, KeepServiceIndexEntry{ SizedDigest: SizedDigest(fields[0]), Mtime: mtime, }) atomic.AddInt64(&progress, 1) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("Error scanning index response: %v", err) } if !sawEOF { return nil, fmt.Errorf("Index response had no EOF marker") } return entries, nil }