14812: Fix a couple more tests
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "fmt"
10         "net/http"
11         "strconv"
12         "strings"
13 )
14
15 // KeepService is an arvados#keepService record
16 type KeepService struct {
17         UUID           string `json:"uuid"`
18         ServiceHost    string `json:"service_host"`
19         ServicePort    int    `json:"service_port"`
20         ServiceSSLFlag bool   `json:"service_ssl_flag"`
21         ServiceType    string `json:"service_type"`
22         ReadOnly       bool   `json:"read_only"`
23 }
24
25 type KeepMount struct {
26         UUID           string   `json:"uuid"`
27         DeviceID       string   `json:"device_id"`
28         ReadOnly       bool     `json:"read_only"`
29         Replication    int      `json:"replication"`
30         StorageClasses []string `json:"storage_classes"`
31 }
32
33 // KeepServiceList is an arvados#keepServiceList record
34 type KeepServiceList struct {
35         Items          []KeepService `json:"items"`
36         ItemsAvailable int           `json:"items_available"`
37         Offset         int           `json:"offset"`
38         Limit          int           `json:"limit"`
39 }
40
41 // KeepServiceIndexEntry is what a keep service's index response tells
42 // us about a stored block.
43 type KeepServiceIndexEntry struct {
44         SizedDigest
45         // Time of last write, in nanoseconds since Unix epoch
46         Mtime int64
47 }
48
49 // EachKeepService calls f once for every readable
50 // KeepService. EachKeepService stops if it encounters an
51 // error, such as f returning a non-nil error.
52 func (c *Client) EachKeepService(f func(KeepService) error) error {
53         params := ResourceListParams{}
54         for {
55                 var page KeepServiceList
56                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
57                 if err != nil {
58                         return err
59                 }
60                 for _, item := range page.Items {
61                         err = f(item)
62                         if err != nil {
63                                 return err
64                         }
65                 }
66                 params.Offset = params.Offset + len(page.Items)
67                 if params.Offset >= page.ItemsAvailable {
68                         return nil
69                 }
70         }
71 }
72
73 func (s *KeepService) url(path string) string {
74         var f string
75         if s.ServiceSSLFlag {
76                 f = "https://%s:%d/%s"
77         } else {
78                 f = "http://%s:%d/%s"
79         }
80         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
81 }
82
83 // String implements fmt.Stringer
84 func (s *KeepService) String() string {
85         return s.UUID
86 }
87
88 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
89         url := s.url("mounts")
90         req, err := http.NewRequest("GET", url, nil)
91         if err != nil {
92                 return nil, err
93         }
94         var mounts []KeepMount
95         err = c.DoAndDecode(&mounts, req)
96         if err != nil {
97                 return nil, fmt.Errorf("GET %v: %v", url, err)
98         }
99         return mounts, nil
100 }
101
102 // Index returns an unsorted list of blocks at the given mount point.
103 func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
104         return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
105 }
106
107 // Index returns an unsorted list of blocks that can be retrieved from
108 // this server.
109 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
110         return s.index(c, s.url("index/"+prefix))
111 }
112
113 func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
114         req, err := http.NewRequest("GET", url, nil)
115         if err != nil {
116                 return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
117         }
118         resp, err := c.Do(req)
119         if err != nil {
120                 return nil, fmt.Errorf("Do(%v): %v", url, err)
121         } else if resp.StatusCode != 200 {
122                 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
123         }
124         defer resp.Body.Close()
125
126         var entries []KeepServiceIndexEntry
127         scanner := bufio.NewScanner(resp.Body)
128         sawEOF := false
129         for scanner.Scan() {
130                 if scanner.Err() != nil {
131                         // If we encounter a read error (timeout,
132                         // connection failure), stop now and return it
133                         // below, so it doesn't get masked by the
134                         // ensuing "badly formatted response" error.
135                         break
136                 }
137                 if sawEOF {
138                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
139                 }
140                 line := scanner.Text()
141                 if line == "" {
142                         sawEOF = true
143                         continue
144                 }
145                 fields := strings.Split(line, " ")
146                 if len(fields) != 2 {
147                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
148                 }
149                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
150                 if err != nil {
151                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
152                 }
153                 if mtime < 1e12 {
154                         // An old version of keepstore is giving us
155                         // timestamps in seconds instead of
156                         // nanoseconds. (This threshold correctly
157                         // handles all times between 1970-01-02 and
158                         // 33658-09-27.)
159                         mtime = mtime * 1e9
160                 }
161                 entries = append(entries, KeepServiceIndexEntry{
162                         SizedDigest: SizedDigest(fields[0]),
163                         Mtime:       mtime,
164                 })
165         }
166         if err := scanner.Err(); err != nil {
167                 return nil, fmt.Errorf("Error scanning index response: %v", err)
168         }
169         if !sawEOF {
170                 return nil, fmt.Errorf("Index response had no EOF marker")
171         }
172         return entries, nil
173 }