1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 // KeepService is an arvados#keepService record
22 type KeepService struct {
23 UUID string `json:"uuid"`
24 ServiceHost string `json:"service_host"`
25 ServicePort int `json:"service_port"`
26 ServiceSSLFlag bool `json:"service_ssl_flag"`
27 ServiceType string `json:"service_type"`
28 ReadOnly bool `json:"read_only"`
29 CreatedAt time.Time `json:"created_at"`
30 ModifiedAt time.Time `json:"modified_at"`
33 type KeepMount struct {
34 UUID string `json:"uuid"`
35 DeviceID string `json:"device_id"`
36 ReadOnly bool `json:"read_only"`
37 Replication int `json:"replication"`
38 StorageClasses map[string]bool `json:"storage_classes"`
41 // KeepServiceList is an arvados#keepServiceList record
42 type KeepServiceList struct {
43 Items []KeepService `json:"items"`
44 ItemsAvailable int `json:"items_available"`
45 Offset int `json:"offset"`
46 Limit int `json:"limit"`
49 // KeepServiceIndexEntry is what a keep service's index response tells
50 // us about a stored block.
51 type KeepServiceIndexEntry struct {
53 // Time of last write, in nanoseconds since Unix epoch
57 // EachKeepService calls f once for every readable
58 // KeepService. EachKeepService stops if it encounters an
59 // error, such as f returning a non-nil error.
60 func (c *Client) EachKeepService(f func(KeepService) error) error {
61 params := ResourceListParams{}
63 var page KeepServiceList
64 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
68 for _, item := range page.Items {
74 params.Offset = params.Offset + len(page.Items)
75 if params.Offset >= page.ItemsAvailable {
81 func (s *KeepService) url(path string) string {
84 f = "https://%s:%d/%s"
88 return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
91 // String implements fmt.Stringer
92 func (s *KeepService) String() string {
96 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
97 url := s.url("mounts")
98 req, err := http.NewRequest("GET", url, nil)
102 var mounts []KeepMount
103 err = c.DoAndDecode(&mounts, req)
105 return nil, fmt.Errorf("GET %v: %v", url, err)
110 // Touch updates the timestamp on the given block.
111 func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
112 req, err := http.NewRequest("TOUCH", s.url(blk), nil)
116 resp, err := c.Do(req.WithContext(ctx))
120 defer resp.Body.Close()
121 if resp.StatusCode != http.StatusOK {
122 body, _ := ioutil.ReadAll(resp.Body)
123 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
128 // Untrash moves/copies the given block out of trash.
129 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
130 req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
134 resp, err := c.Do(req.WithContext(ctx))
138 defer resp.Body.Close()
139 if resp.StatusCode != http.StatusOK {
140 body, _ := ioutil.ReadAll(resp.Body)
141 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
146 // IndexMount returns an unsorted list of blocks at the given mount point.
147 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
148 return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
151 // Index returns an unsorted list of blocks that can be retrieved from
153 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
154 return s.index(ctx, c, prefix, s.url("index/"+prefix))
157 func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
158 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
160 return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
162 resp, err := c.Do(req)
164 return nil, fmt.Errorf("Do(%v): %v", url, err)
165 } else if resp.StatusCode != 200 {
166 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
168 defer resp.Body.Close()
171 ctx, cancel := context.WithCancel(ctx)
174 log := ctxlog.FromContext(ctx)
175 logticker := time.NewTicker(5 * time.Minute)
176 defer logticker.Stop()
180 log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
187 var entries []KeepServiceIndexEntry
188 scanner := bufio.NewScanner(resp.Body)
191 if ctx.Err() != nil {
192 return nil, ctx.Err()
194 if scanner.Err() != nil {
195 // If we encounter a read error (timeout,
196 // connection failure), stop now and return it
197 // below, so it doesn't get masked by the
198 // ensuing "badly formatted response" error.
202 return nil, fmt.Errorf("Index response contained non-terminal blank line")
204 line := scanner.Text()
209 fields := strings.Split(line, " ")
210 if len(fields) != 2 {
211 return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
213 if !strings.HasPrefix(fields[0], prefix) {
214 return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
216 mtime, err := strconv.ParseInt(fields[1], 10, 64)
218 return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
221 // An old version of keepstore is giving us
222 // timestamps in seconds instead of
223 // nanoseconds. (This threshold correctly
224 // handles all times between 1970-01-02 and
228 entries = append(entries, KeepServiceIndexEntry{
229 SizedDigest: SizedDigest(fields[0]),
232 atomic.AddInt64(&progress, 1)
234 if err := scanner.Err(); err != nil {
235 return nil, fmt.Errorf("Error scanning index response: %v", err)
238 return nil, fmt.Errorf("Index response had no EOF marker")