20259: Add documentation for banner and tooltip features
[arvados.git] / sdk / go / arvados / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bufio"
9         "context"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "sync/atomic"
16         "time"
17
18         "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 )
20
21 // KeepService is an arvados#keepService record
22 type KeepService struct {
23         UUID           string    `json:"uuid"`
24         ServiceHost    string    `json:"service_host"`
25         ServicePort    int       `json:"service_port"`
26         ServiceSSLFlag bool      `json:"service_ssl_flag"`
27         ServiceType    string    `json:"service_type"`
28         ReadOnly       bool      `json:"read_only"`
29         CreatedAt      time.Time `json:"created_at"`
30         ModifiedAt     time.Time `json:"modified_at"`
31 }
32
33 type KeepMount struct {
34         UUID           string          `json:"uuid"`
35         DeviceID       string          `json:"device_id"`
36         ReadOnly       bool            `json:"read_only"`
37         Replication    int             `json:"replication"`
38         StorageClasses map[string]bool `json:"storage_classes"`
39 }
40
41 // KeepServiceList is an arvados#keepServiceList record
42 type KeepServiceList struct {
43         Items          []KeepService `json:"items"`
44         ItemsAvailable int           `json:"items_available"`
45         Offset         int           `json:"offset"`
46         Limit          int           `json:"limit"`
47 }
48
49 // KeepServiceIndexEntry is what a keep service's index response tells
50 // us about a stored block.
51 type KeepServiceIndexEntry struct {
52         SizedDigest
53         // Time of last write, in nanoseconds since Unix epoch
54         Mtime int64
55 }
56
57 // EachKeepService calls f once for every readable
58 // KeepService. EachKeepService stops if it encounters an
59 // error, such as f returning a non-nil error.
60 func (c *Client) EachKeepService(f func(KeepService) error) error {
61         params := ResourceListParams{}
62         for {
63                 var page KeepServiceList
64                 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
65                 if err != nil {
66                         return err
67                 }
68                 for _, item := range page.Items {
69                         err = f(item)
70                         if err != nil {
71                                 return err
72                         }
73                 }
74                 params.Offset = params.Offset + len(page.Items)
75                 if params.Offset >= page.ItemsAvailable {
76                         return nil
77                 }
78         }
79 }
80
81 func (s *KeepService) url(path string) string {
82         var f string
83         if s.ServiceSSLFlag {
84                 f = "https://%s:%d/%s"
85         } else {
86                 f = "http://%s:%d/%s"
87         }
88         return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
89 }
90
91 // String implements fmt.Stringer
92 func (s *KeepService) String() string {
93         return s.UUID
94 }
95
96 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
97         url := s.url("mounts")
98         req, err := http.NewRequest("GET", url, nil)
99         if err != nil {
100                 return nil, err
101         }
102         var mounts []KeepMount
103         err = c.DoAndDecode(&mounts, req)
104         if err != nil {
105                 return nil, fmt.Errorf("GET %v: %v", url, err)
106         }
107         return mounts, nil
108 }
109
110 // Touch updates the timestamp on the given block.
111 func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
112         req, err := http.NewRequest("TOUCH", s.url(blk), nil)
113         if err != nil {
114                 return err
115         }
116         resp, err := c.Do(req.WithContext(ctx))
117         if err != nil {
118                 return err
119         }
120         defer resp.Body.Close()
121         if resp.StatusCode != http.StatusOK {
122                 body, _ := ioutil.ReadAll(resp.Body)
123                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
124         }
125         return nil
126 }
127
128 // Untrash moves/copies the given block out of trash.
129 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
130         req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
131         if err != nil {
132                 return err
133         }
134         resp, err := c.Do(req.WithContext(ctx))
135         if err != nil {
136                 return err
137         }
138         defer resp.Body.Close()
139         if resp.StatusCode != http.StatusOK {
140                 body, _ := ioutil.ReadAll(resp.Body)
141                 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
142         }
143         return nil
144 }
145
146 // IndexMount returns an unsorted list of blocks at the given mount point.
147 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
148         return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
149 }
150
151 // Index returns an unsorted list of blocks that can be retrieved from
152 // this server.
153 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
154         return s.index(ctx, c, prefix, s.url("index/"+prefix))
155 }
156
157 func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
158         req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
159         if err != nil {
160                 return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
161         }
162         resp, err := c.Do(req)
163         if err != nil {
164                 return nil, fmt.Errorf("Do(%v): %v", url, err)
165         } else if resp.StatusCode != 200 {
166                 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
167         }
168         defer resp.Body.Close()
169
170         var progress int64
171         ctx, cancel := context.WithCancel(ctx)
172         defer cancel()
173         go func() {
174                 log := ctxlog.FromContext(ctx)
175                 logticker := time.NewTicker(5 * time.Minute)
176                 defer logticker.Stop()
177                 for {
178                         select {
179                         case <-logticker.C:
180                                 log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
181                         case <-ctx.Done():
182                                 return
183                         }
184                 }
185         }()
186
187         var entries []KeepServiceIndexEntry
188         scanner := bufio.NewScanner(resp.Body)
189         sawEOF := false
190         for scanner.Scan() {
191                 if ctx.Err() != nil {
192                         return nil, ctx.Err()
193                 }
194                 if scanner.Err() != nil {
195                         // If we encounter a read error (timeout,
196                         // connection failure), stop now and return it
197                         // below, so it doesn't get masked by the
198                         // ensuing "badly formatted response" error.
199                         break
200                 }
201                 if sawEOF {
202                         return nil, fmt.Errorf("Index response contained non-terminal blank line")
203                 }
204                 line := scanner.Text()
205                 if line == "" {
206                         sawEOF = true
207                         continue
208                 }
209                 fields := strings.Split(line, " ")
210                 if len(fields) != 2 {
211                         return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
212                 }
213                 if !strings.HasPrefix(fields[0], prefix) {
214                         return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
215                 }
216                 mtime, err := strconv.ParseInt(fields[1], 10, 64)
217                 if err != nil {
218                         return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
219                 }
220                 if mtime < 1e12 {
221                         // An old version of keepstore is giving us
222                         // timestamps in seconds instead of
223                         // nanoseconds. (This threshold correctly
224                         // handles all times between 1970-01-02 and
225                         // 33658-09-27.)
226                         mtime = mtime * 1e9
227                 }
228                 entries = append(entries, KeepServiceIndexEntry{
229                         SizedDigest: SizedDigest(fields[0]),
230                         Mtime:       mtime,
231                 })
232                 atomic.AddInt64(&progress, 1)
233         }
234         if err := scanner.Err(); err != nil {
235                 return nil, fmt.Errorf("Error scanning index response: %v", err)
236         }
237         if !sawEOF {
238                 return nil, fmt.Errorf("Index response had no EOF marker")
239         }
240         return entries, nil
241 }