1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
16 // KeepService is an arvados#keepService record
17 type KeepService struct {
18 UUID string `json:"uuid"`
19 ServiceHost string `json:"service_host"`
20 ServicePort int `json:"service_port"`
21 ServiceSSLFlag bool `json:"service_ssl_flag"`
22 ServiceType string `json:"service_type"`
23 ReadOnly bool `json:"read_only"`
24 CreatedAt time.Time `json:"created_at"`
25 ModifiedAt time.Time `json:"modified_at"`
28 type KeepMount struct {
29 UUID string `json:"uuid"`
30 DeviceID string `json:"device_id"`
31 ReadOnly bool `json:"read_only"`
32 Replication int `json:"replication"`
33 StorageClasses map[string]bool `json:"storage_classes"`
36 // KeepServiceList is an arvados#keepServiceList record
37 type KeepServiceList struct {
38 Items []KeepService `json:"items"`
39 ItemsAvailable int `json:"items_available"`
40 Offset int `json:"offset"`
41 Limit int `json:"limit"`
44 // KeepServiceIndexEntry is what a keep service's index response tells
45 // us about a stored block.
46 type KeepServiceIndexEntry struct {
48 // Time of last write, in nanoseconds since Unix epoch
52 // EachKeepService calls f once for every readable
53 // KeepService. EachKeepService stops if it encounters an
54 // error, such as f returning a non-nil error.
55 func (c *Client) EachKeepService(f func(KeepService) error) error {
56 params := ResourceListParams{}
58 var page KeepServiceList
59 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
63 for _, item := range page.Items {
69 params.Offset = params.Offset + len(page.Items)
70 if params.Offset >= page.ItemsAvailable {
76 func (s *KeepService) url(path string) string {
79 f = "https://%s:%d/%s"
83 return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
86 // String implements fmt.Stringer
87 func (s *KeepService) String() string {
91 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
92 url := s.url("mounts")
93 req, err := http.NewRequest("GET", url, nil)
97 var mounts []KeepMount
98 err = c.DoAndDecode(&mounts, req)
100 return nil, fmt.Errorf("GET %v: %v", url, err)
105 // Index returns an unsorted list of blocks at the given mount point.
106 func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
107 return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
110 // Index returns an unsorted list of blocks that can be retrieved from
112 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
113 return s.index(c, s.url("index/"+prefix))
116 func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
117 req, err := http.NewRequest("GET", url, nil)
119 return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
121 resp, err := c.Do(req)
123 return nil, fmt.Errorf("Do(%v): %v", url, err)
124 } else if resp.StatusCode != 200 {
125 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
127 defer resp.Body.Close()
129 var entries []KeepServiceIndexEntry
130 scanner := bufio.NewScanner(resp.Body)
133 if scanner.Err() != nil {
134 // If we encounter a read error (timeout,
135 // connection failure), stop now and return it
136 // below, so it doesn't get masked by the
137 // ensuing "badly formatted response" error.
141 return nil, fmt.Errorf("Index response contained non-terminal blank line")
143 line := scanner.Text()
148 fields := strings.Split(line, " ")
149 if len(fields) != 2 {
150 return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
152 mtime, err := strconv.ParseInt(fields[1], 10, 64)
154 return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
157 // An old version of keepstore is giving us
158 // timestamps in seconds instead of
159 // nanoseconds. (This threshold correctly
160 // handles all times between 1970-01-02 and
164 entries = append(entries, KeepServiceIndexEntry{
165 SizedDigest: SizedDigest(fields[0]),
169 if err := scanner.Err(); err != nil {
170 return nil, fmt.Errorf("Error scanning index response: %v", err)
173 return nil, fmt.Errorf("Index response had no EOF marker")