Merge branch '15014-wrong-node-count'
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "fmt"
10         "net/http"
11         "strconv"
12         "strings"
13         "time"
14 )
15
16 // KeepService is an arvados#keepService record
17 type KeepService struct {
18         UUID           string    `json:"uuid"`
19         ServiceHost    string    `json:"service_host"`
20         ServicePort    int       `json:"service_port"`
21         ServiceSSLFlag bool      `json:"service_ssl_flag"`
22         ServiceType    string    `json:"service_type"`
23         ReadOnly       bool      `json:"read_only"`
24         CreatedAt      time.Time `json:"created_at"`
25         ModifiedAt     time.Time `json:"modified_at"`
26 }
27
28 type KeepMount struct {
29         UUID           string          `json:"uuid"`
30         DeviceID       string          `json:"device_id"`
31         ReadOnly       bool            `json:"read_only"`
32         Replication    int             `json:"replication"`
33         StorageClasses map[string]bool `json:"storage_classes"`
34 }
35
36 // KeepServiceList is an arvados#keepServiceList record
37 type KeepServiceList struct {
38         Items          []KeepService `json:"items"`
39         ItemsAvailable int           `json:"items_available"`
40         Offset         int           `json:"offset"`
41         Limit          int           `json:"limit"`
42 }
43
44 // KeepServiceIndexEntry is what a keep service's index response tells
45 // us about a stored block.
46 type KeepServiceIndexEntry struct {
47         SizedDigest
48         // Time of last write, in nanoseconds since Unix epoch
49         Mtime int64
50 }
51
52 // EachKeepService calls f once for every readable
53 // KeepService. EachKeepService stops if it encounters an
54 // error, such as f returning a non-nil error.
55 func (c *Client) EachKeepService(f func(KeepService) error) error {
56         params := ResourceListParams{}
57         for {
58                 var page KeepServiceList
59                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
60                 if err != nil {
61                         return err
62                 }
63                 for _, item := range page.Items {
64                         err = f(item)
65                         if err != nil {
66                                 return err
67                         }
68                 }
69                 params.Offset = params.Offset + len(page.Items)
70                 if params.Offset >= page.ItemsAvailable {
71                         return nil
72                 }
73         }
74 }
75
76 func (s *KeepService) url(path string) string {
77         var f string
78         if s.ServiceSSLFlag {
79                 f = "https://%s:%d/%s"
80         } else {
81                 f = "http://%s:%d/%s"
82         }
83         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
84 }
85
86 // String implements fmt.Stringer
87 func (s *KeepService) String() string {
88         return s.UUID
89 }
90
91 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
92         url := s.url("mounts")
93         req, err := http.NewRequest("GET", url, nil)
94         if err != nil {
95                 return nil, err
96         }
97         var mounts []KeepMount
98         err = c.DoAndDecode(&mounts, req)
99         if err != nil {
100                 return nil, fmt.Errorf("GET %v: %v", url, err)
101         }
102         return mounts, nil
103 }
104
105 // Index returns an unsorted list of blocks at the given mount point.
106 func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
107         return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
108 }
109
110 // Index returns an unsorted list of blocks that can be retrieved from
111 // this server.
112 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
113         return s.index(c, s.url("index/"+prefix))
114 }
115
116 func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
117         req, err := http.NewRequest("GET", url, nil)
118         if err != nil {
119                 return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
120         }
121         resp, err := c.Do(req)
122         if err != nil {
123                 return nil, fmt.Errorf("Do(%v): %v", url, err)
124         } else if resp.StatusCode != 200 {
125                 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
126         }
127         defer resp.Body.Close()
128
129         var entries []KeepServiceIndexEntry
130         scanner := bufio.NewScanner(resp.Body)
131         sawEOF := false
132         for scanner.Scan() {
133                 if scanner.Err() != nil {
134                         // If we encounter a read error (timeout,
135                         // connection failure), stop now and return it
136                         // below, so it doesn't get masked by the
137                         // ensuing "badly formatted response" error.
138                         break
139                 }
140                 if sawEOF {
141                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
142                 }
143                 line := scanner.Text()
144                 if line == "" {
145                         sawEOF = true
146                         continue
147                 }
148                 fields := strings.Split(line, " ")
149                 if len(fields) != 2 {
150                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
151                 }
152                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
153                 if err != nil {
154                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
155                 }
156                 if mtime < 1e12 {
157                         // An old version of keepstore is giving us
158                         // timestamps in seconds instead of
159                         // nanoseconds. (This threshold correctly
160                         // handles all times between 1970-01-02 and
161                         // 33658-09-27.)
162                         mtime = mtime * 1e9
163                 }
164                 entries = append(entries, KeepServiceIndexEntry{
165                         SizedDigest: SizedDigest(fields[0]),
166                         Mtime:       mtime,
167                 })
168         }
169         if err := scanner.Err(); err != nil {
170                 return nil, fmt.Errorf("Error scanning index response: %v", err)
171         }
172         if !sawEOF {
173                 return nil, fmt.Errorf("Index response had no EOF marker")
174         }
175         return entries, nil
176 }