11167: Merge branch 'master' into 11167-wb-remove-arvget
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "fmt"
10         "net/http"
11         "strconv"
12         "strings"
13 )
14
15 // KeepService is an arvados#keepService record
16 type KeepService struct {
17         UUID           string `json:"uuid"`
18         ServiceHost    string `json:"service_host"`
19         ServicePort    int    `json:"service_port"`
20         ServiceSSLFlag bool   `json:"service_ssl_flag"`
21         ServiceType    string `json:"service_type"`
22         ReadOnly       bool   `json:"read_only"`
23 }
24
25 // KeepServiceList is an arvados#keepServiceList record
26 type KeepServiceList struct {
27         Items          []KeepService `json:"items"`
28         ItemsAvailable int           `json:"items_available"`
29         Offset         int           `json:"offset"`
30         Limit          int           `json:"limit"`
31 }
32
33 // KeepServiceIndexEntry is what a keep service's index response tells
34 // us about a stored block.
35 type KeepServiceIndexEntry struct {
36         SizedDigest
37         // Time of last write, in nanoseconds since Unix epoch
38         Mtime int64
39 }
40
41 // EachKeepService calls f once for every readable
42 // KeepService. EachKeepService stops if it encounters an
43 // error, such as f returning a non-nil error.
44 func (c *Client) EachKeepService(f func(KeepService) error) error {
45         params := ResourceListParams{}
46         for {
47                 var page KeepServiceList
48                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
49                 if err != nil {
50                         return err
51                 }
52                 for _, item := range page.Items {
53                         err = f(item)
54                         if err != nil {
55                                 return err
56                         }
57                 }
58                 params.Offset = params.Offset + len(page.Items)
59                 if params.Offset >= page.ItemsAvailable {
60                         return nil
61                 }
62         }
63 }
64
65 func (s *KeepService) url(path string) string {
66         var f string
67         if s.ServiceSSLFlag {
68                 f = "https://%s:%d/%s"
69         } else {
70                 f = "http://%s:%d/%s"
71         }
72         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
73 }
74
75 // String implements fmt.Stringer
76 func (s *KeepService) String() string {
77         return s.UUID
78 }
79
80 // Index returns an unsorted list of blocks that can be retrieved from
81 // this server.
82 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
83         url := s.url("index/" + prefix)
84         req, err := http.NewRequest("GET", url, nil)
85         if err != nil {
86                 return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
87         }
88         resp, err := c.Do(req)
89         if err != nil {
90                 return nil, fmt.Errorf("Do(%v): %v", url, err)
91         } else if resp.StatusCode != 200 {
92                 return nil, fmt.Errorf("%v: %v", url, resp.Status)
93         }
94         defer resp.Body.Close()
95
96         var entries []KeepServiceIndexEntry
97         scanner := bufio.NewScanner(resp.Body)
98         sawEOF := false
99         for scanner.Scan() {
100                 if sawEOF {
101                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
102                 }
103                 line := scanner.Text()
104                 if line == "" {
105                         sawEOF = true
106                         continue
107                 }
108                 fields := strings.Split(line, " ")
109                 if len(fields) != 2 {
110                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
111                 }
112                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
113                 if err != nil {
114                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
115                 }
116                 if mtime < 1e12 {
117                         // An old version of keepstore is giving us
118                         // timestamps in seconds instead of
119                         // nanoseconds. (This threshold correctly
120                         // handles all times between 1970-01-02 and
121                         // 33658-09-27.)
122                         mtime = mtime * 1e9
123                 }
124                 entries = append(entries, KeepServiceIndexEntry{
125                         SizedDigest: SizedDigest(fields[0]),
126                         Mtime:       mtime,
127                 })
128         }
129         if err := scanner.Err(); err != nil {
130                 return nil, fmt.Errorf("Error scanning index response: %v", err)
131         }
132         if !sawEOF {
133                 return nil, fmt.Errorf("Index response had no EOF marker")
134         }
135         return entries, nil
136 }