17394: Use BlockWrite interface in crunch-run.
[arvados.git] / services / keep-balance / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16 )
17
18 // KeepService represents a keepstore server that is being rebalanced.
19 type KeepService struct {
20         arvados.KeepService
21         mounts []*KeepMount
22         *ChangeSet
23 }
24
25 // String implements fmt.Stringer.
26 func (srv *KeepService) String() string {
27         return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
28 }
29
30 var ksSchemes = map[bool]string{false: "http", true: "https"}
31
32 // URLBase returns scheme://host:port for this server.
33 func (srv *KeepService) URLBase() string {
34         return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
35 }
36
37 // CommitPulls sends the current list of pull requests to the storage
38 // server (even if the list is empty).
39 func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
40         return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
41 }
42
43 // CommitTrash sends the current list of trash requests to the storage
44 // server (even if the list is empty).
45 func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
46         return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
47 }
48
49 // Perform a PUT request at path, with data (as JSON) in the request
50 // body.
51 func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
52         // We'll start a goroutine to do the JSON encoding, so we can
53         // stream it to the http client through a Pipe, rather than
54         // keeping the entire encoded version in memory.
55         jsonR, jsonW := io.Pipe()
56
57         // errC communicates any encoding errors back to our main
58         // goroutine.
59         errC := make(chan error, 1)
60
61         go func() {
62                 enc := json.NewEncoder(jsonW)
63                 errC <- enc.Encode(data)
64                 jsonW.Close()
65         }()
66
67         url := srv.URLBase() + "/" + path
68         req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
69         if err != nil {
70                 return fmt.Errorf("building request for %s: %v", url, err)
71         }
72         err = c.DoAndDecode(nil, req)
73
74         // If there was an error encoding the request body, report
75         // that instead of the response: obviously we won't get a
76         // useful response if our request wasn't properly encoded.
77         if encErr := <-errC; encErr != nil {
78                 return fmt.Errorf("encoding data for %s: %v", url, encErr)
79         }
80
81         return err
82 }
83
84 func (srv *KeepService) discoverMounts(c *arvados.Client) error {
85         mounts, err := srv.Mounts(c)
86         if err != nil {
87                 return fmt.Errorf("%s: error retrieving mounts: %v", srv, err)
88         }
89         srv.mounts = nil
90         for _, m := range mounts {
91                 srv.mounts = append(srv.mounts, &KeepMount{
92                         KeepMount:   m,
93                         KeepService: srv,
94                 })
95         }
96         return nil
97 }
98
99 type KeepMount struct {
100         arvados.KeepMount
101         KeepService *KeepService
102 }
103
104 // String implements fmt.Stringer.
105 func (mnt *KeepMount) String() string {
106         return fmt.Sprintf("%s (%s) on %s", mnt.UUID, mnt.DeviceID, mnt.KeepService)
107 }