10813: Merge branch 'master' into 10813-arv-put-six-threads
[arvados.git] / sdk / go / arvados / keep_service.go
1 package arvados
2
3 import (
4         "bufio"
5         "fmt"
6         "net/http"
7         "strconv"
8         "strings"
9 )
10
11 // KeepService is an arvados#keepService record
12 type KeepService struct {
13         UUID           string `json:"uuid"`
14         ServiceHost    string `json:"service_host"`
15         ServicePort    int    `json:"service_port"`
16         ServiceSSLFlag bool   `json:"service_ssl_flag"`
17         ServiceType    string `json:"service_type"`
18         ReadOnly       bool   `json:"read_only"`
19 }
20
21 // KeepServiceList is an arvados#keepServiceList record
22 type KeepServiceList struct {
23         Items          []KeepService `json:"items"`
24         ItemsAvailable int           `json:"items_available"`
25         Offset         int           `json:"offset"`
26         Limit          int           `json:"limit"`
27 }
28
29 // KeepServiceIndexEntry is what a keep service's index response tells
30 // us about a stored block.
31 type KeepServiceIndexEntry struct {
32         SizedDigest
33         // Time of last write, in nanoseconds since Unix epoch
34         Mtime int64
35 }
36
37 // EachKeepService calls f once for every readable
38 // KeepService. EachKeepService stops if it encounters an
39 // error, such as f returning a non-nil error.
40 func (c *Client) EachKeepService(f func(KeepService) error) error {
41         params := ResourceListParams{}
42         for {
43                 var page KeepServiceList
44                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
45                 if err != nil {
46                         return err
47                 }
48                 for _, item := range page.Items {
49                         err = f(item)
50                         if err != nil {
51                                 return err
52                         }
53                 }
54                 params.Offset = params.Offset + len(page.Items)
55                 if params.Offset >= page.ItemsAvailable {
56                         return nil
57                 }
58         }
59 }
60
61 func (s *KeepService) url(path string) string {
62         var f string
63         if s.ServiceSSLFlag {
64                 f = "https://%s:%d/%s"
65         } else {
66                 f = "http://%s:%d/%s"
67         }
68         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
69 }
70
71 // String implements fmt.Stringer
72 func (s *KeepService) String() string {
73         return s.UUID
74 }
75
76 // Index returns an unsorted list of blocks that can be retrieved from
77 // this server.
78 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
79         url := s.url("index/" + prefix)
80         req, err := http.NewRequest("GET", url, nil)
81         if err != nil {
82                 return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
83         }
84         resp, err := c.Do(req)
85         if err != nil {
86                 return nil, fmt.Errorf("Do(%v): %v", url, err)
87         } else if resp.StatusCode != 200 {
88                 return nil, fmt.Errorf("%v: %v", url, resp.Status)
89         }
90         defer resp.Body.Close()
91
92         var entries []KeepServiceIndexEntry
93         scanner := bufio.NewScanner(resp.Body)
94         sawEOF := false
95         for scanner.Scan() {
96                 if sawEOF {
97                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
98                 }
99                 line := scanner.Text()
100                 if line == "" {
101                         sawEOF = true
102                         continue
103                 }
104                 fields := strings.Split(line, " ")
105                 if len(fields) != 2 {
106                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
107                 }
108                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
109                 if err != nil {
110                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
111                 }
112                 if mtime < 1e12 {
113                         // An old version of keepstore is giving us
114                         // timestamps in seconds instead of
115                         // nanoseconds. (This threshold correctly
116                         // handles all times between 1970-01-02 and
117                         // 33658-09-27.)
118                         mtime = mtime * 1e9
119                 }
120                 entries = append(entries, KeepServiceIndexEntry{
121                         SizedDigest: SizedDigest(fields[0]),
122                         Mtime:       mtime,
123                 })
124         }
125         if err := scanner.Err(); err != nil {
126                 return nil, fmt.Errorf("Error scanning index response: %v", err)
127         }
128         if !sawEOF {
129                 return nil, fmt.Errorf("Index response had no EOF marker")
130         }
131         return entries, nil
132 }