17853: Fix map write when only RLock held.
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "context"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "time"
16 )
17
18 // KeepService is an arvados#keepService record
19 type KeepService struct {
20         UUID           string    `json:"uuid"`
21         ServiceHost    string    `json:"service_host"`
22         ServicePort    int       `json:"service_port"`
23         ServiceSSLFlag bool      `json:"service_ssl_flag"`
24         ServiceType    string    `json:"service_type"`
25         ReadOnly       bool      `json:"read_only"`
26         CreatedAt      time.Time `json:"created_at"`
27         ModifiedAt     time.Time `json:"modified_at"`
28 }
29
30 type KeepMount struct {
31         UUID           string          `json:"uuid"`
32         DeviceID       string          `json:"device_id"`
33         ReadOnly       bool            `json:"read_only"`
34         Replication    int             `json:"replication"`
35         StorageClasses map[string]bool `json:"storage_classes"`
36 }
37
38 // KeepServiceList is an arvados#keepServiceList record
39 type KeepServiceList struct {
40         Items          []KeepService `json:"items"`
41         ItemsAvailable int           `json:"items_available"`
42         Offset         int           `json:"offset"`
43         Limit          int           `json:"limit"`
44 }
45
46 // KeepServiceIndexEntry is what a keep service's index response tells
47 // us about a stored block.
48 type KeepServiceIndexEntry struct {
49         SizedDigest
50         // Time of last write, in nanoseconds since Unix epoch
51         Mtime int64
52 }
53
54 // EachKeepService calls f once for every readable
55 // KeepService. EachKeepService stops if it encounters an
56 // error, such as f returning a non-nil error.
57 func (c *Client) EachKeepService(f func(KeepService) error) error {
58         params := ResourceListParams{}
59         for {
60                 var page KeepServiceList
61                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
62                 if err != nil {
63                         return err
64                 }
65                 for _, item := range page.Items {
66                         err = f(item)
67                         if err != nil {
68                                 return err
69                         }
70                 }
71                 params.Offset = params.Offset + len(page.Items)
72                 if params.Offset >= page.ItemsAvailable {
73                         return nil
74                 }
75         }
76 }
77
78 func (s *KeepService) url(path string) string {
79         var f string
80         if s.ServiceSSLFlag {
81                 f = "https://%s:%d/%s"
82         } else {
83                 f = "http://%s:%d/%s"
84         }
85         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
86 }
87
88 // String implements fmt.Stringer
89 func (s *KeepService) String() string {
90         return s.UUID
91 }
92
93 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
94         url := s.url("mounts")
95         req, err := http.NewRequest("GET", url, nil)
96         if err != nil {
97                 return nil, err
98         }
99         var mounts []KeepMount
100         err = c.DoAndDecode(&mounts, req)
101         if err != nil {
102                 return nil, fmt.Errorf("GET %v: %v", url, err)
103         }
104         return mounts, nil
105 }
106
107 // Touch updates the timestamp on the given block.
108 func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
109         req, err := http.NewRequest("TOUCH", s.url(blk), nil)
110         if err != nil {
111                 return err
112         }
113         resp, err := c.Do(req.WithContext(ctx))
114         if err != nil {
115                 return err
116         }
117         defer resp.Body.Close()
118         if resp.StatusCode != http.StatusOK {
119                 body, _ := ioutil.ReadAll(resp.Body)
120                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
121         }
122         return nil
123 }
124
125 // Untrash moves/copies the given block out of trash.
126 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
127         req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
128         if err != nil {
129                 return err
130         }
131         resp, err := c.Do(req.WithContext(ctx))
132         if err != nil {
133                 return err
134         }
135         defer resp.Body.Close()
136         if resp.StatusCode != http.StatusOK {
137                 body, _ := ioutil.ReadAll(resp.Body)
138                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
139         }
140         return nil
141 }
142
143 // IndexMount returns an unsorted list of blocks at the given mount point.
144 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
145         return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
146 }
147
148 // Index returns an unsorted list of blocks that can be retrieved from
149 // this server.
150 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
151         return s.index(ctx, c, s.url("index/"+prefix))
152 }
153
154 func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
155         req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
156         if err != nil {
157                 return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
158         }
159         resp, err := c.Do(req)
160         if err != nil {
161                 return nil, fmt.Errorf("Do(%v): %v", url, err)
162         } else if resp.StatusCode != 200 {
163                 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
164         }
165         defer resp.Body.Close()
166
167         var entries []KeepServiceIndexEntry
168         scanner := bufio.NewScanner(resp.Body)
169         sawEOF := false
170         for scanner.Scan() {
171                 if scanner.Err() != nil {
172                         // If we encounter a read error (timeout,
173                         // connection failure), stop now and return it
174                         // below, so it doesn't get masked by the
175                         // ensuing "badly formatted response" error.
176                         break
177                 }
178                 if sawEOF {
179                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
180                 }
181                 line := scanner.Text()
182                 if line == "" {
183                         sawEOF = true
184                         continue
185                 }
186                 fields := strings.Split(line, " ")
187                 if len(fields) != 2 {
188                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
189                 }
190                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
191                 if err != nil {
192                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
193                 }
194                 if mtime < 1e12 {
195                         // An old version of keepstore is giving us
196                         // timestamps in seconds instead of
197                         // nanoseconds. (This threshold correctly
198                         // handles all times between 1970-01-02 and
199                         // 33658-09-27.)
200                         mtime = mtime * 1e9
201                 }
202                 entries = append(entries, KeepServiceIndexEntry{
203                         SizedDigest: SizedDigest(fields[0]),
204                         Mtime:       mtime,
205                 })
206         }
207         if err := scanner.Err(); err != nil {
208                 return nil, fmt.Errorf("Error scanning index response: %v", err)
209         }
210         if !sawEOF {
211                 return nil, fmt.Errorf("Index response had no EOF marker")
212         }
213         return entries, nil
214 }