21773: Replace test-only config key with env var.
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "context"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "sync/atomic"
16         "time"
17
18         "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 )
20
21 // KeepService is an arvados#keepService record
22 type KeepService struct {
23         UUID           string    `json:"uuid"`
24         ServiceHost    string    `json:"service_host"`
25         ServicePort    int       `json:"service_port"`
26         ServiceSSLFlag bool      `json:"service_ssl_flag"`
27         ServiceType    string    `json:"service_type"`
28         ReadOnly       bool      `json:"read_only"`
29         CreatedAt      time.Time `json:"created_at"`
30         ModifiedAt     time.Time `json:"modified_at"`
31 }
32
33 type KeepMount struct {
34         UUID           string          `json:"uuid"`
35         DeviceID       string          `json:"device_id"`
36         AllowWrite     bool            `json:"allow_write"`
37         AllowTrash     bool            `json:"allow_trash"`
38         Replication    int             `json:"replication"`
39         StorageClasses map[string]bool `json:"storage_classes"`
40 }
41
42 // KeepServiceList is an arvados#keepServiceList record
43 type KeepServiceList struct {
44         Items          []KeepService `json:"items"`
45         ItemsAvailable int           `json:"items_available"`
46         Offset         int           `json:"offset"`
47         Limit          int           `json:"limit"`
48 }
49
50 // KeepServiceIndexEntry is what a keep service's index response tells
51 // us about a stored block.
52 type KeepServiceIndexEntry struct {
53         SizedDigest
54         // Time of last write, in nanoseconds since Unix epoch
55         Mtime int64
56 }
57
58 // EachKeepService calls f once for every readable
59 // KeepService. EachKeepService stops if it encounters an
60 // error, such as f returning a non-nil error.
61 func (c *Client) EachKeepService(f func(KeepService) error) error {
62         params := ResourceListParams{}
63         for {
64                 var page KeepServiceList
65                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
66                 if err != nil {
67                         return err
68                 }
69                 for _, item := range page.Items {
70                         err = f(item)
71                         if err != nil {
72                                 return err
73                         }
74                 }
75                 params.Offset = params.Offset + len(page.Items)
76                 if params.Offset >= page.ItemsAvailable {
77                         return nil
78                 }
79         }
80 }
81
82 func (s *KeepService) url(path string) string {
83         var f string
84         if s.ServiceSSLFlag {
85                 f = "https://%s:%d/%s"
86         } else {
87                 f = "http://%s:%d/%s"
88         }
89         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
90 }
91
92 // String implements fmt.Stringer
93 func (s *KeepService) String() string {
94         return s.UUID
95 }
96
97 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
98         url := s.url("mounts")
99         req, err := http.NewRequest("GET", url, nil)
100         if err != nil {
101                 return nil, err
102         }
103         var mounts []KeepMount
104         err = c.DoAndDecode(&mounts, req)
105         if err != nil {
106                 return nil, fmt.Errorf("GET %v: %v", url, err)
107         }
108         return mounts, nil
109 }
110
111 // Touch updates the timestamp on the given block.
112 func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
113         req, err := http.NewRequest("TOUCH", s.url(blk), nil)
114         if err != nil {
115                 return err
116         }
117         resp, err := c.Do(req.WithContext(ctx))
118         if err != nil {
119                 return err
120         }
121         defer resp.Body.Close()
122         if resp.StatusCode != http.StatusOK {
123                 body, _ := ioutil.ReadAll(resp.Body)
124                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
125         }
126         return nil
127 }
128
129 // Untrash moves/copies the given block out of trash.
130 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
131         req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
132         if err != nil {
133                 return err
134         }
135         resp, err := c.Do(req.WithContext(ctx))
136         if err != nil {
137                 return err
138         }
139         defer resp.Body.Close()
140         if resp.StatusCode != http.StatusOK {
141                 body, _ := ioutil.ReadAll(resp.Body)
142                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
143         }
144         return nil
145 }
146
147 // IndexMount returns an unsorted list of blocks at the given mount point.
148 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
149         return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
150 }
151
152 // Index returns an unsorted list of blocks that can be retrieved from
153 // this server.
154 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
155         return s.index(ctx, c, prefix, s.url("index/"+prefix))
156 }
157
158 func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
159         req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
160         if err != nil {
161                 return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
162         }
163         resp, err := c.Do(req)
164         if err != nil {
165                 return nil, fmt.Errorf("Do(%v): %v", url, err)
166         } else if resp.StatusCode != 200 {
167                 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
168         }
169         defer resp.Body.Close()
170
171         var progress int64
172         ctx, cancel := context.WithCancel(ctx)
173         defer cancel()
174         go func() {
175                 log := ctxlog.FromContext(ctx)
176                 logticker := time.NewTicker(5 * time.Minute)
177                 defer logticker.Stop()
178                 for {
179                         select {
180                         case <-logticker.C:
181                                 log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
182                         case <-ctx.Done():
183                                 return
184                         }
185                 }
186         }()
187
188         var entries []KeepServiceIndexEntry
189         scanner := bufio.NewScanner(resp.Body)
190         sawEOF := false
191         for scanner.Scan() {
192                 if ctx.Err() != nil {
193                         return nil, ctx.Err()
194                 }
195                 if scanner.Err() != nil {
196                         // If we encounter a read error (timeout,
197                         // connection failure), stop now and return it
198                         // below, so it doesn't get masked by the
199                         // ensuing "badly formatted response" error.
200                         break
201                 }
202                 if sawEOF {
203                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
204                 }
205                 line := scanner.Text()
206                 if line == "" {
207                         sawEOF = true
208                         continue
209                 }
210                 fields := strings.Split(line, " ")
211                 if len(fields) != 2 {
212                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
213                 }
214                 if !strings.HasPrefix(fields[0], prefix) {
215                         return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
216                 }
217                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
218                 if err != nil {
219                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
220                 }
221                 if mtime < 1e12 {
222                         // An old version of keepstore is giving us
223                         // timestamps in seconds instead of
224                         // nanoseconds. (This threshold correctly
225                         // handles all times between 1970-01-02 and
226                         // 33658-09-27.)
227                         mtime = mtime * 1e9
228                 }
229                 entries = append(entries, KeepServiceIndexEntry{
230                         SizedDigest: SizedDigest(fields[0]),
231                         Mtime:       mtime,
232                 })
233                 atomic.AddInt64(&progress, 1)
234         }
235         if err := scanner.Err(); err != nil {
236                 return nil, fmt.Errorf("Error scanning index response: %v", err)
237         }
238         if !sawEOF {
239                 return nil, fmt.Errorf("Index response had no EOF marker")
240         }
241         return entries, nil
242 }