Merge branch '15928-fs-deadlock'
[arvados.git] / services / keep-balance / keep_service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15 )
16
17 // KeepService represents a keepstore server that is being rebalanced.
18 type KeepService struct {
19         arvados.KeepService
20         mounts []*KeepMount
21         *ChangeSet
22 }
23
24 // String implements fmt.Stringer.
25 func (srv *KeepService) String() string {
26         return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
27 }
28
29 var ksSchemes = map[bool]string{false: "http", true: "https"}
30
31 // URLBase returns scheme://host:port for this server.
32 func (srv *KeepService) URLBase() string {
33         return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
34 }
35
36 // CommitPulls sends the current list of pull requests to the storage
37 // server (even if the list is empty).
38 func (srv *KeepService) CommitPulls(c *arvados.Client) error {
39         return srv.put(c, "pull", srv.ChangeSet.Pulls)
40 }
41
42 // CommitTrash sends the current list of trash requests to the storage
43 // server (even if the list is empty).
44 func (srv *KeepService) CommitTrash(c *arvados.Client) error {
45         return srv.put(c, "trash", srv.ChangeSet.Trashes)
46 }
47
48 // Perform a PUT request at path, with data (as JSON) in the request
49 // body.
50 func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
51         // We'll start a goroutine to do the JSON encoding, so we can
52         // stream it to the http client through a Pipe, rather than
53         // keeping the entire encoded version in memory.
54         jsonR, jsonW := io.Pipe()
55
56         // errC communicates any encoding errors back to our main
57         // goroutine.
58         errC := make(chan error, 1)
59
60         go func() {
61                 enc := json.NewEncoder(jsonW)
62                 errC <- enc.Encode(data)
63                 jsonW.Close()
64         }()
65
66         url := srv.URLBase() + "/" + path
67         req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
68         if err != nil {
69                 return fmt.Errorf("building request for %s: %v", url, err)
70         }
71         err = c.DoAndDecode(nil, req)
72
73         // If there was an error encoding the request body, report
74         // that instead of the response: obviously we won't get a
75         // useful response if our request wasn't properly encoded.
76         if encErr := <-errC; encErr != nil {
77                 return fmt.Errorf("encoding data for %s: %v", url, encErr)
78         }
79
80         return err
81 }
82
83 func (srv *KeepService) discoverMounts(c *arvados.Client) error {
84         mounts, err := srv.Mounts(c)
85         if err != nil {
86                 return fmt.Errorf("%s: error retrieving mounts: %v", srv, err)
87         }
88         srv.mounts = nil
89         for _, m := range mounts {
90                 srv.mounts = append(srv.mounts, &KeepMount{
91                         KeepMount:   m,
92                         KeepService: srv,
93                 })
94         }
95         return nil
96 }
97
98 type KeepMount struct {
99         arvados.KeepMount
100         KeepService *KeepService
101 }
102
103 // String implements fmt.Stringer.
104 func (mnt *KeepMount) String() string {
105         return fmt.Sprintf("%s (%s) on %s", mnt.UUID, mnt.DeviceID, mnt.KeepService)
106 }