1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 // KeepService is an arvados#keepService record
22 type KeepService struct {
23 UUID string `json:"uuid"`
24 ServiceHost string `json:"service_host"`
25 ServicePort int `json:"service_port"`
26 ServiceSSLFlag bool `json:"service_ssl_flag"`
27 ServiceType string `json:"service_type"`
28 ReadOnly bool `json:"read_only"`
29 CreatedAt time.Time `json:"created_at"`
30 ModifiedAt time.Time `json:"modified_at"`
33 type KeepMount struct {
34 UUID string `json:"uuid"`
35 DeviceID string `json:"device_id"`
36 AllowWrite bool `json:"allow_write"`
37 AllowTrash bool `json:"allow_trash"`
38 Replication int `json:"replication"`
39 StorageClasses map[string]bool `json:"storage_classes"`
42 // KeepServiceList is an arvados#keepServiceList record
43 type KeepServiceList struct {
44 Items []KeepService `json:"items"`
45 ItemsAvailable int `json:"items_available"`
46 Offset int `json:"offset"`
47 Limit int `json:"limit"`
50 // KeepServiceIndexEntry is what a keep service's index response tells
51 // us about a stored block.
52 type KeepServiceIndexEntry struct {
54 // Time of last write, in nanoseconds since Unix epoch
58 // EachKeepService calls f once for every readable
59 // KeepService. EachKeepService stops if it encounters an
60 // error, such as f returning a non-nil error.
61 func (c *Client) EachKeepService(f func(KeepService) error) error {
62 params := ResourceListParams{}
64 var page KeepServiceList
65 err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
69 for _, item := range page.Items {
75 params.Offset = params.Offset + len(page.Items)
76 if params.Offset >= page.ItemsAvailable {
82 func (s *KeepService) url(path string) string {
85 f = "https://%s:%d/%s"
89 return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
92 // String implements fmt.Stringer
93 func (s *KeepService) String() string {
97 func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
98 url := s.url("mounts")
99 req, err := http.NewRequest("GET", url, nil)
103 var mounts []KeepMount
104 err = c.DoAndDecode(&mounts, req)
106 return nil, fmt.Errorf("GET %v: %v", url, err)
111 // Touch updates the timestamp on the given block.
112 func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
113 req, err := http.NewRequest("TOUCH", s.url(blk), nil)
117 resp, err := c.Do(req.WithContext(ctx))
121 defer resp.Body.Close()
122 if resp.StatusCode != http.StatusOK {
123 body, _ := ioutil.ReadAll(resp.Body)
124 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
129 // Untrash moves/copies the given block out of trash.
130 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
131 req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
135 resp, err := c.Do(req.WithContext(ctx))
139 defer resp.Body.Close()
140 if resp.StatusCode != http.StatusOK {
141 body, _ := ioutil.ReadAll(resp.Body)
142 return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
147 // IndexMount returns an unsorted list of blocks at the given mount point.
148 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
149 return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
152 // Index returns an unsorted list of blocks that can be retrieved from
154 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
155 return s.index(ctx, c, prefix, s.url("index/"+prefix))
158 func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
159 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
161 return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
163 resp, err := c.Do(req)
165 return nil, fmt.Errorf("Do(%v): %v", url, err)
166 } else if resp.StatusCode != 200 {
167 return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
169 defer resp.Body.Close()
172 ctx, cancel := context.WithCancel(ctx)
175 log := ctxlog.FromContext(ctx)
176 logticker := time.NewTicker(5 * time.Minute)
177 defer logticker.Stop()
181 log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
188 var entries []KeepServiceIndexEntry
189 scanner := bufio.NewScanner(resp.Body)
192 if ctx.Err() != nil {
193 return nil, ctx.Err()
195 if scanner.Err() != nil {
196 // If we encounter a read error (timeout,
197 // connection failure), stop now and return it
198 // below, so it doesn't get masked by the
199 // ensuing "badly formatted response" error.
203 return nil, fmt.Errorf("Index response contained non-terminal blank line")
205 line := scanner.Text()
210 fields := strings.Split(line, " ")
211 if len(fields) != 2 {
212 return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
214 if !strings.HasPrefix(fields[0], prefix) {
215 return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
217 mtime, err := strconv.ParseInt(fields[1], 10, 64)
219 return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
222 // An old version of keepstore is giving us
223 // timestamps in seconds instead of
224 // nanoseconds. (This threshold correctly
225 // handles all times between 1970-01-02 and
229 entries = append(entries, KeepServiceIndexEntry{
230 SizedDigest: SizedDigest(fields[0]),
233 atomic.AddInt64(&progress, 1)
235 if err := scanner.Err(); err != nil {
236 return nil, fmt.Errorf("Error scanning index response: %v", err)
239 return nil, fmt.Errorf("Index response had no EOF marker")