From b002129afda08bbb4fdbed6e629858a5c298c068 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 16 May 2016 17:09:21 -0400 Subject: [PATCH] 9162: Add keep-balance --- build/package-build-dockerfiles/Makefile | 8 +- .../centos6/Dockerfile | 2 +- .../debian7/Dockerfile | 2 +- .../debian8/Dockerfile | 2 +- .../ubuntu1204/Dockerfile | 2 +- .../ubuntu1404/Dockerfile | 2 +- build/run-build-packages-one-target.sh | 3 +- build/run-build-packages.sh | 2 + build/run-tests.sh | 8 +- sdk/go/arvados/client.go | 169 +++++ sdk/go/arvados/client_test.go | 83 +++ sdk/go/arvados/collection.go | 62 ++ sdk/go/arvados/doc.go | 12 + sdk/go/arvados/duration.go | 31 + sdk/go/arvados/keep_block.go | 15 + sdk/go/arvados/keep_service.go | 123 ++++ sdk/go/arvados/resource_list.go | 25 + sdk/go/arvados/resource_list_test.go | 21 + sdk/go/arvados/user.go | 17 + services/keep-balance/balance.go | 611 ++++++++++++++++++ services/keep-balance/balance_run_test.go | 374 +++++++++++ services/keep-balance/balance_test.go | 255 ++++++++ services/keep-balance/block_state.go | 95 +++ services/keep-balance/change_set.go | 75 +++ services/keep-balance/change_set_test.go | 35 + services/keep-balance/collection.go | 95 +++ services/keep-balance/integration_test.go | 92 +++ services/keep-balance/keep_service.go | 76 +++ services/keep-balance/main.go | 156 +++++ services/keep-balance/main_test.go | 43 ++ services/keep-balance/time_me.go | 14 + services/keep-balance/usage.go | 83 +++ services/keepstore/keepstore.go | 7 +- 33 files changed, 2589 insertions(+), 11 deletions(-) create mode 100644 sdk/go/arvados/client.go create mode 100644 sdk/go/arvados/client_test.go create mode 100644 sdk/go/arvados/collection.go create mode 100644 sdk/go/arvados/doc.go create mode 100644 sdk/go/arvados/duration.go create mode 100644 sdk/go/arvados/keep_block.go create mode 100644 sdk/go/arvados/keep_service.go create mode 100644 sdk/go/arvados/resource_list.go create mode 100644 sdk/go/arvados/resource_list_test.go create mode 100644 sdk/go/arvados/user.go create mode 100644 services/keep-balance/balance.go create mode 100644 services/keep-balance/balance_run_test.go create mode 100644 services/keep-balance/balance_test.go create mode 100644 services/keep-balance/block_state.go create mode 100644 services/keep-balance/change_set.go create mode 100644 services/keep-balance/change_set_test.go create mode 100644 services/keep-balance/collection.go create mode 100644 services/keep-balance/integration_test.go create mode 100644 services/keep-balance/keep_service.go create mode 100644 services/keep-balance/main.go create mode 100644 services/keep-balance/main_test.go create mode 100644 services/keep-balance/time_me.go create mode 100644 services/keep-balance/usage.go diff --git a/build/package-build-dockerfiles/Makefile b/build/package-build-dockerfiles/Makefile index 9216f8264b..3482886112 100644 --- a/build/package-build-dockerfiles/Makefile +++ b/build/package-build-dockerfiles/Makefile @@ -20,10 +20,12 @@ ubuntu1404/generated: common-generated-all test -d ubuntu1404/generated || mkdir ubuntu1404/generated cp -rlt ubuntu1404/generated common-generated/* -common-generated-all: common-generated/golang-amd64.tar.gz +GOTARBALL=go1.6.2.linux-amd64.tar.gz -common-generated/golang-amd64.tar.gz: common-generated - wget -cqO common-generated/golang-amd64.tar.gz http://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz +common-generated-all: common-generated/$(GOTARBALL) + +common-generated/$(GOTARBALL): common-generated + wget -cqO common-generated/$(GOTARBALL) http://storage.googleapis.com/golang/$(GOTARBALL) common-generated: mkdir common-generated diff --git a/build/package-build-dockerfiles/centos6/Dockerfile b/build/package-build-dockerfiles/centos6/Dockerfile index c21c091822..570dde162c 100644 --- a/build/package-build-dockerfiles/centos6/Dockerfile +++ b/build/package-build-dockerfiles/centos6/Dockerfile @@ -5,7 +5,7 @@ MAINTAINER Brett Smith RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel # Install golang binary -ADD generated/golang-amd64.tar.gz /usr/local/ +ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/ RUN ln -s /usr/local/go/bin/go /usr/local/bin/ # Install RVM diff --git a/build/package-build-dockerfiles/debian7/Dockerfile b/build/package-build-dockerfiles/debian7/Dockerfile index ccc444c314..ddad542604 100644 --- a/build/package-build-dockerfiles/debian7/Dockerfile +++ b/build/package-build-dockerfiles/debian7/Dockerfile @@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b # Install golang binary -ADD generated/golang-amd64.tar.gz /usr/local/ +ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/ RUN ln -s /usr/local/go/bin/go /usr/local/bin/ ENV WORKSPACE /arvados diff --git a/build/package-build-dockerfiles/debian8/Dockerfile b/build/package-build-dockerfiles/debian8/Dockerfile index e32cfb44a2..80f06a224b 100644 --- a/build/package-build-dockerfiles/debian8/Dockerfile +++ b/build/package-build-dockerfiles/debian8/Dockerfile @@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b # Install golang binary -ADD generated/golang-amd64.tar.gz /usr/local/ +ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/ RUN ln -s /usr/local/go/bin/go /usr/local/bin/ ENV WORKSPACE /arvados diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile index 34bf6985b0..2f628b0d1f 100644 --- a/build/package-build-dockerfiles/ubuntu1204/Dockerfile +++ b/build/package-build-dockerfiles/ubuntu1204/Dockerfile @@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b # Install golang binary -ADD generated/golang-amd64.tar.gz /usr/local/ +ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/ RUN ln -s /usr/local/go/bin/go /usr/local/bin/ ENV WORKSPACE /arvados diff --git a/build/package-build-dockerfiles/ubuntu1404/Dockerfile b/build/package-build-dockerfiles/ubuntu1404/Dockerfile index 53771640ee..b9c003ac79 100644 --- a/build/package-build-dockerfiles/ubuntu1404/Dockerfile +++ b/build/package-build-dockerfiles/ubuntu1404/Dockerfile @@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b # Install golang binary -ADD generated/golang-amd64.tar.gz /usr/local/ +ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/ RUN ln -s /usr/local/go/bin/go /usr/local/bin/ ENV WORKSPACE /arvados diff --git a/build/run-build-packages-one-target.sh b/build/run-build-packages-one-target.sh index 322129e057..6fdffd09da 100755 --- a/build/run-build-packages-one-target.sh +++ b/build/run-build-packages-one-target.sh @@ -128,9 +128,10 @@ if test -z "$packages" ; then arvados-src arvados-workbench crunchstat + keep-balance + keep-block-check keepproxy keep-rsync - keep-block-check keepstore keep-web libarvados-perl" diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh index a51198f9ef..7631718f7e 100755 --- a/build/run-build-packages.sh +++ b/build/run-build-packages.sh @@ -384,6 +384,8 @@ package_go_binary services/keepstore keepstore \ "Keep storage daemon, accessible to clients on the LAN" package_go_binary services/keepproxy keepproxy \ "Make a Keep cluster accessible to clients that are not on the LAN" +package_go_binary services/keep-balance keep-balance \ + "Rebalance and garbage-collect data blocks stored in Arvados Keep" package_go_binary services/keep-web keep-web \ "Static web hosting service for user data stored in Arvados Keep" package_go_binary services/datamanager arvados-data-manager \ diff --git a/build/run-tests.sh b/build/run-tests.sh index d22934199f..30a80f527a 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -70,6 +70,7 @@ services/fuse services/keep-web services/keepproxy services/keepstore +services/keep-balance services/login-sync services/nodemanager services/crunch-run @@ -79,6 +80,7 @@ sdk/cli sdk/pam sdk/python sdk/ruby +sdk/go/arvados sdk/go/arvadosclient sdk/go/keepclient sdk/go/httpserver @@ -150,6 +152,8 @@ sanity_checks() { echo -n 'go: ' go version \ || fatal "No go binary. See http://golang.org/doc/install" + [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \ + || fatal "Go >= 1.6 required. See http://golang.org/doc/install" echo -n 'gcc: ' gcc --version | egrep ^gcc \ || fatal "No gcc. Try: apt-get install build-essential" @@ -499,7 +503,7 @@ do_test_once() { then # "go test -check.vv giturl" doesn't work, but this # does: - cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]} + cd "$WORKSPACE/$1" && go test ${short:+-short} ${testargs[$1]} else # The above form gets verbose even when testargs is # empty, so use this form in such cases: @@ -703,6 +707,7 @@ do_install services/api apiserver declare -a gostuff gostuff=( + sdk/go/arvados sdk/go/arvadosclient sdk/go/blockdigest sdk/go/httpserver @@ -714,6 +719,7 @@ gostuff=( services/keep-web services/keepstore sdk/go/keepclient + services/keep-balance services/keepproxy services/datamanager/summary services/datamanager/collection diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go new file mode 100644 index 0000000000..ee830c8c40 --- /dev/null +++ b/sdk/go/arvados/client.go @@ -0,0 +1,169 @@ +package arvados + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" +) + +// A Client is an HTTP client with an API endpoint and a set of +// Arvados credentials. +// +// It offers methods for accessing individual Arvados APIs, and +// methods that implement common patterns like fetching multiple pages +// of results using List APIs. +type Client struct { + // HTTP client used to make requests. If nil, + // http.DefaultClient or InsecureHTTPClient will be used. + Client *http.Client + + // Hostname (or host:port) of Arvados API server. + APIHost string + + // User authentication token. + AuthToken string + + // Accept unverified certificates. This works only if the + // Client field is nil: otherwise, it has no effect. + Insecure bool +} + +// The default http.Client used by a Client with Insecure==true and +// Client==nil. +var InsecureHTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true}}} + +// NewClientFromEnv creates a new Client that uses the default HTTP +// client with the API endpoint and credentials given by the +// ARVADOS_API_* environment variables. +func NewClientFromEnv() *Client { + return &Client{ + APIHost: os.Getenv("ARVADOS_API_HOST"), + AuthToken: os.Getenv("ARVADOS_API_TOKEN"), + Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "", + } +} + +// Do adds authentication headers and then calls (*http.Client)Do(). +func (c *Client) Do(req *http.Request) (*http.Response, error) { + if c.AuthToken != "" { + req.Header.Add("Authorization", "OAuth2 "+c.AuthToken) + } + return c.httpClient().Do(req) +} + +// DoAndDecode performs req and unmarshals the response (which must be +// JSON) into dst. Use this instead of RequestAndDecode if you need +// more control of the http.Request object. +func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error { + resp, err := c.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + buf, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status) + } + if dst == nil { + return nil + } + return json.Unmarshal(buf, dst) +} + +// RequestAndDecode performs an API request and unmarshals the +// response (which must be JSON) into dst. Method and body arguments +// are the same as for http.NewRequest(). The given path is added to +// the server's scheme/host/port to form the request URL. The given +// params are passed via POST form or query string. +// +// path must not contain a query string. +func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error { + urlString := c.apiURL(path) + var urlValues url.Values + if v, ok := params.(url.Values); ok { + urlValues = v + } else if params != nil { + // Convert an arbitrary struct to url.Values. For + // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes + // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`} + // + // TODO: Do this more efficiently, possibly using + // json.Decode/Encode, so the whole thing doesn't have + // to get encoded, decoded, and re-encoded. + j, err := json.Marshal(params) + if err != nil { + return err + } + var generic map[string]interface{} + err = json.Unmarshal(j, &generic) + if err != nil { + return err + } + urlValues = url.Values{} + for k, v := range generic { + if v, ok := v.(string); ok { + urlValues.Set(k, v) + continue + } + j, err := json.Marshal(v) + if err != nil { + return err + } + urlValues.Set(k, string(j)) + } + } + if (method == "GET" || body != nil) && urlValues != nil { + // FIXME: what if params don't fit in URL + u, err := url.Parse(urlString) + if err != nil { + return err + } + u.RawQuery = urlValues.Encode() + urlString = u.String() + } + req, err := http.NewRequest(method, urlString, body) + if err != nil { + return err + } + return c.DoAndDecode(dst, req) +} + +func (c *Client) httpClient() *http.Client { + switch { + case c.Client != nil: + return c.Client + case c.Insecure: + return InsecureHTTPClient + default: + return http.DefaultClient + } +} + +func (c *Client) apiURL(path string) string { + return "https://" + c.APIHost + "/" + path +} + +// DiscoveryDocument is the Arvados server's description of itself. +type DiscoveryDocument struct { + DefaultCollectionReplication int `json:"defaultCollectionReplication"` + BlobSignatureTTL int64 `json:"blobSignatureTtl"` +} + +// DiscoveryDocument returns a *DiscoveryDocument. The returned object +// should not be modified: the same object may be returned by +// subsequent calls. +func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) { + var dd DiscoveryDocument + return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) +} diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go new file mode 100644 index 0000000000..2db50bfd6c --- /dev/null +++ b/sdk/go/arvados/client_test.go @@ -0,0 +1,83 @@ +package arvados + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "sync" + "testing" +) + +type stubTransport struct { + Responses map[string]string + Requests []http.Request + sync.Mutex +} + +func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) { + stub.Lock() + stub.Requests = append(stub.Requests, *req) + stub.Unlock() + + resp := &http.Response{ + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Request: req, + } + str := stub.Responses[req.URL.Path] + if str == "" { + resp.Status = "404 Not Found" + resp.StatusCode = 404 + str = "{}" + } + buf := bytes.NewBufferString(str) + resp.Body = ioutil.NopCloser(buf) + resp.ContentLength = int64(buf.Len()) + return resp, nil +} + +type errorTransport struct{} + +func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, fmt.Errorf("something awful happened") +} + +func TestCurrentUser(t *testing.T) { + t.Parallel() + stub := &stubTransport{ + Responses: map[string]string{ + "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`, + }, + } + c := &Client{ + Client: &http.Client{ + Transport: stub, + }, + APIHost: "zzzzz.arvadosapi.com", + AuthToken: "xyzzy", + } + u, err := c.CurrentUser() + if err != nil { + t.Fatal(err) + } + if x := "zzzzz-abcde-012340123401234"; u.UUID != x { + t.Errorf("got uuid %q, expected %q", u.UUID, x) + } + if len(stub.Requests) < 1 { + t.Fatal("empty stub.Requests") + } + hdr := stub.Requests[len(stub.Requests)-1].Header + if hdr.Get("Authorization") != "OAuth2 xyzzy" { + t.Errorf("got headers %+q, expected Authorization header", hdr) + } + + c.Client.Transport = &errorTransport{} + u, err = c.CurrentUser() + if err == nil { + t.Errorf("got nil error, expected something awful") + } +} diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go new file mode 100644 index 0000000000..71f5247615 --- /dev/null +++ b/sdk/go/arvados/collection.go @@ -0,0 +1,62 @@ +package arvados + +import ( + "bufio" + "fmt" + "strings" + "time" + + "git.curoverse.com/arvados.git/sdk/go/manifest" +) + +// Collection is an arvados#collection resource. +type Collection struct { + UUID string `json:"uuid,omitempty"` + ExpiresAt *time.Time `json:"expires_at,omitempty"` + ManifestText string `json:"manifest_text,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + ModifiedAt *time.Time `json:"modified_at,omitempty"` + PortableDataHash string `json:"portable_data_hash,omitempty"` + ReplicationConfirmed *int `json:"replication_confirmed,omitempty"` + ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"` + ReplicationDesired *int `json:"replication_desired,omitempty"` +} + +// SizedDigests returns the hash+size part of each data block +// referenced by the collection. +func (c *Collection) SizedDigests() ([]SizedDigest, error) { + if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" { + // TODO: Check more subtle forms of corruption, too + return nil, fmt.Errorf("manifest is missing") + } + var sds []SizedDigest + scanner := bufio.NewScanner(strings.NewReader(c.ManifestText)) + scanner.Buffer(make([]byte, 1048576), len(c.ManifestText)) + for scanner.Scan() { + line := scanner.Text() + tokens := strings.Split(line, " ") + if len(tokens) < 3 { + return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line) + } + for _, token := range tokens[1:] { + if !manifest.LocatorPattern.MatchString(token) { + // FIXME: ensure it's a file token + break + } + // FIXME: shouldn't assume 32 char hash + if i := strings.IndexRune(token[33:], '+'); i >= 0 { + token = token[:33+i] + } + sds = append(sds, SizedDigest(token)) + } + } + return sds, scanner.Err() +} + +// CollectionList is an arvados#collectionList resource. +type CollectionList struct { + Items []Collection `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} diff --git a/sdk/go/arvados/doc.go b/sdk/go/arvados/doc.go new file mode 100644 index 0000000000..1e8141e848 --- /dev/null +++ b/sdk/go/arvados/doc.go @@ -0,0 +1,12 @@ +// Package arvados is a client library for Arvados. +// +// The API is not stable: it should be considered experimental +// pre-release. +// +// The intent is to offer model types and API call functions that can +// be generated automatically (or at least mostly automatically) from +// a discovery document. For the time being, there is a manually +// generated subset of those types and API calls with (approximately) +// the right signatures, plus client/authentication support and some +// convenience functions. +package arvados diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go new file mode 100644 index 0000000000..1639c5852a --- /dev/null +++ b/sdk/go/arvados/duration.go @@ -0,0 +1,31 @@ +package arvados + +import ( + "encoding/json" + "fmt" + "time" +) + +// Duration is time.Duration but looks like "12s" in JSON, rather than +// a number of nanoseconds. +type Duration time.Duration + +// UnmarshalJSON implements json.Unmarshaler +func (d *Duration) UnmarshalJSON(data []byte) error { + if data[0] == '"' { + dur, err := time.ParseDuration(string(data[1 : len(data)-1])) + *d = Duration(dur) + return err + } + return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"") +} + +// MarshalJSON implements json.Marshaler +func (d *Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(d.String()) +} + +// String implements fmt.Stringer +func (d Duration) String() string { + return time.Duration(d).String() +} diff --git a/sdk/go/arvados/keep_block.go b/sdk/go/arvados/keep_block.go new file mode 100644 index 0000000000..c9a7712f57 --- /dev/null +++ b/sdk/go/arvados/keep_block.go @@ -0,0 +1,15 @@ +package arvados + +import ( + "strconv" + "strings" +) + +// SizedDigest is a minimal Keep block locator: hash+size +type SizedDigest string + +// Size returns the size of the data block, in bytes. +func (sd SizedDigest) Size() int64 { + n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64) + return n +} diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go new file mode 100644 index 0000000000..4af1b7910f --- /dev/null +++ b/sdk/go/arvados/keep_service.go @@ -0,0 +1,123 @@ +package arvados + +import ( + "bufio" + "fmt" + "net/http" + "strconv" + "strings" +) + +// KeepService is an arvados#keepService record +type KeepService struct { + UUID string `json:"uuid"` + ServiceHost string `json:"service_host"` + ServicePort int `json:"service_port"` + ServiceSSLFlag bool `json:"service_ssl_flag"` + ServiceType string `json:"service_type"` + ReadOnly bool `json:"read_only"` +} + +// KeepServiceList is an arvados#keepServiceList record +type KeepServiceList struct { + Items []KeepService `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +// KeepServiceIndexEntry is what a keep service's index response tells +// us about a stored block. +type KeepServiceIndexEntry struct { + SizedDigest + Mtime int64 +} + +// EachKeepService calls f once for every readable +// KeepService. EachKeepService stops if it encounters an +// error, such as f returning a non-nil error. +func (c *Client) EachKeepService(f func(KeepService) error) error { + params := ResourceListParams{} + for { + var page KeepServiceList + err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params) + if err != nil { + return err + } + for _, item := range page.Items { + err = f(item) + if err != nil { + return err + } + } + params.Offset = params.Offset + len(page.Items) + if params.Offset >= page.ItemsAvailable { + return nil + } + } +} + +func (s *KeepService) url(path string) string { + var f string + if s.ServiceSSLFlag { + f = "https://%s:%d/%s" + } else { + f = "http://%s:%d/%s" + } + return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path) +} + +// String implements fmt.Stringer +func (s *KeepService) String() string { + return s.UUID +} + +// Index returns an unsorted list of blocks that can be retrieved from +// this server. +func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) { + url := s.url("index/" + prefix) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("NewRequest(%v): %v", url, err) + } + resp, err := c.Do(req) + if err != nil { + return nil, fmt.Errorf("Do(%v): %v", url, err) + } else if resp.StatusCode != 200 { + return nil, fmt.Errorf("%v: %v", url, resp.Status) + } + defer resp.Body.Close() + + var entries []KeepServiceIndexEntry + scanner := bufio.NewScanner(resp.Body) + sawEOF := false + for scanner.Scan() { + if sawEOF { + return nil, fmt.Errorf("Index response contained non-terminal blank line") + } + line := scanner.Text() + if line == "" { + sawEOF = true + continue + } + fields := strings.Split(line, " ") + if len(fields) != 2 { + return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields)) + } + mtime, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err) + } + entries = append(entries, KeepServiceIndexEntry{ + SizedDigest: SizedDigest(fields[0]), + Mtime: mtime, + }) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("Error scanning index response: %v", err) + } + if !sawEOF { + return nil, fmt.Errorf("Index response had no EOF marker") + } + return entries, nil +} diff --git a/sdk/go/arvados/resource_list.go b/sdk/go/arvados/resource_list.go new file mode 100644 index 0000000000..e9ea268ff2 --- /dev/null +++ b/sdk/go/arvados/resource_list.go @@ -0,0 +1,25 @@ +package arvados + +import "encoding/json" + +// ResourceListParams expresses which results are requested in a +// list/index API. +type ResourceListParams struct { + Select []string `json:"select,omitempty"` + Filters []Filter `json:"filters,omitempty"` + Limit *int `json:"limit,omitempty"` + Offset int `json:"offset,omitempty"` + Order string `json:"order,omitempty"` +} + +// A Filter restricts the set of records returned by a list/index API. +type Filter struct { + Attr string + Operator string + Operand interface{} +} + +// MarshalJSON encodes a Filter in the form expected by the API. +func (f *Filter) MarshalJSON() ([]byte, error) { + return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand}) +} diff --git a/sdk/go/arvados/resource_list_test.go b/sdk/go/arvados/resource_list_test.go new file mode 100644 index 0000000000..b5e6e7da47 --- /dev/null +++ b/sdk/go/arvados/resource_list_test.go @@ -0,0 +1,21 @@ +package arvados + +import ( + "bytes" + "encoding/json" + "testing" + "time" +) + +func TestMarshalFiltersWithNanoseconds(t *testing.T) { + t0 := time.Now() + t0str := t0.Format(time.RFC3339Nano) + buf, err := json.Marshal([]Filter{ + {Attr: "modified_at", Operator: "=", Operand: t0}}) + if err != nil { + t.Fatal(err) + } + if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) { + t.Errorf("Encoded as %q, expected %q", buf, expect) + } +} diff --git a/sdk/go/arvados/user.go b/sdk/go/arvados/user.go new file mode 100644 index 0000000000..684a3af8aa --- /dev/null +++ b/sdk/go/arvados/user.go @@ -0,0 +1,17 @@ +package arvados + +// User is an arvados#user record +type User struct { + UUID string `json:"uuid,omitempty"` + IsActive bool `json:"is_active"` + IsAdmin bool `json:"is_admin"` + Username string `json:"username,omitempty"` +} + +// CurrentUser calls arvados.v1.users.current, and returns the User +// record corresponding to this client's credentials. +func (c *Client) CurrentUser() (User, error) { + var u User + err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil) + return u, err +} diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go new file mode 100644 index 0000000000..7471aa6cb0 --- /dev/null +++ b/services/keep-balance/balance.go @@ -0,0 +1,611 @@ +package main + +import ( + "fmt" + "log" + "os" + "runtime" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/keepclient" +) + +// CheckConfig returns an error if anything is wrong with the given +// config and runOptions. +func CheckConfig(config Config, runOptions RunOptions) error { + if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil { + return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config") + } + if !runOptions.Once && config.RunPeriod == arvados.Duration(0) { + return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config") + } + return nil +} + +// Balancer compares the contents of keepstore servers with the +// collections stored in Arvados, and issues pull/trash requests +// needed to get (closer to) the optimal data layout. +// +// In the optimal data layout: every data block referenced by a +// collection is replicated at least as many times as desired by the +// collection; there are no unreferenced data blocks older than +// BlobSignatureTTL; and all N existing replicas of a given data block +// are in the N best positions in rendezvous probe order. +type Balancer struct { + *BlockStateMap + KeepServices map[string]*KeepService + DefaultReplication int + Logger *log.Logger + Dumper *log.Logger + MinMtime int64 + + collScanned int + serviceRoots map[string]string + errors []error + mutex sync.Mutex +} + +// Run performs a balance operation using the given config and +// runOptions. It should only be called once on a given Balancer +// object. Typical usage: +// +// err = (&Balancer{}).Run(config, runOptions) +func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { + bal.Dumper = runOptions.Dumper + bal.Logger = runOptions.Logger + if bal.Logger == nil { + bal.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + + defer timeMe(bal.Logger, "Run")() + + if len(config.KeepServiceList.Items) > 0 { + err = bal.SetKeepServices(config.KeepServiceList) + } else { + err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes) + } + if err != nil { + return + } + + if err = bal.CheckSanityEarly(&config.Client); err != nil { + return + } + if runOptions.CommitTrash { + if err = bal.ClearTrashLists(&config.Client); err != nil { + return + } + } + if err = bal.GetCurrentState(&config.Client); err != nil { + return + } + bal.ComputeChangeSets() + bal.PrintStatistics() + if err = bal.CheckSanityLate(); err != nil { + return + } + if runOptions.CommitPulls { + err = bal.CommitPulls(&config.Client) + if err != nil { + // Skip trash if we can't pull. (Too cautious?) + return + } + } + if runOptions.CommitTrash { + err = bal.CommitTrash(&config.Client) + } + return +} + +// SetKeepServices sets the list of KeepServices to operate on. +func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error { + bal.KeepServices = make(map[string]*KeepService) + for _, srv := range srvList.Items { + bal.KeepServices[srv.UUID] = &KeepService{ + KeepService: srv, + ChangeSet: &ChangeSet{}, + } + } + return nil +} + +// DiscoverKeepServices sets the list of KeepServices by calling the +// API to get a list of all services, and selecting the ones whose +// ServiceType is in okTypes. +func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error { + bal.KeepServices = make(map[string]*KeepService) + ok := make(map[string]bool) + for _, t := range okTypes { + ok[t] = true + } + return c.EachKeepService(func(srv arvados.KeepService) error { + if ok[srv.ServiceType] { + bal.KeepServices[srv.UUID] = &KeepService{ + KeepService: srv, + ChangeSet: &ChangeSet{}, + } + } else { + bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType) + } + return nil + }) +} + +// CheckSanityEarly checks for configuration and runtime errors that +// can be detected before GetCurrentState() and ComputeChangeSets() +// are called. +// +// If it returns an error, it is pointless to run GetCurrentState or +// ComputeChangeSets: after doing so, the statistics would be +// meaningless and it would be dangerous to run any Commit methods. +func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { + u, err := c.CurrentUser() + if err != nil { + return fmt.Errorf("CurrentUser(): %v", err) + } + if !u.IsActive || !u.IsAdmin { + return fmt.Errorf("current user (%s) is not an active admin user", u.UUID) + } + for _, srv := range bal.KeepServices { + if srv.ServiceType == "proxy" { + return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv) + } + } + return nil +} + +// ClearTrashLists sends an empty trash list to each keep +// service. Calling this before GetCurrentState avoids races. +// +// When a block appears in an index, we assume that replica will still +// exist after we delete other replicas on other servers. However, +// it's possible that a previous rebalancing operation made different +// decisions (e.g., servers were added/removed, and rendezvous order +// changed). In this case, the replica might already be on that +// server's trash list, and it might be deleted before we send a +// replacement trash list. +// +// We avoid this problem if we clear all trash lists before getting +// indexes. (We also assume there is only one rebalancing process +// running at a time.) +func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { + for _, srv := range bal.KeepServices { + srv.ChangeSet = &ChangeSet{} + } + return bal.CommitTrash(c) +} + +// GetCurrentState determines the current replication state, and the +// desired replication level, for every block that is either +// retrievable or referenced. +// +// It determines the current replication state by reading the block index +// from every known Keep service. +// +// It determines the desired replication level by retrieving all +// collection manifests in the database (API server). +// +// It encodes the resulting information in BlockStateMap. +func (bal *Balancer) GetCurrentState(c *arvados.Client) error { + defer timeMe(bal.Logger, "GetCurrentState")() + bal.BlockStateMap = NewBlockStateMap() + + dd, err := c.DiscoveryDocument() + if err != nil { + return err + } + bal.DefaultReplication = dd.DefaultCollectionReplication + bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL + + errs := make(chan error, 2+len(bal.KeepServices)) + wg := sync.WaitGroup{} + + // Start one goroutine for each KeepService: retrieve the + // index, and add the returned blocks to BlockStateMap. + for _, srv := range bal.KeepServices { + wg.Add(1) + go func(srv *KeepService) { + defer wg.Done() + bal.logf("%s: retrieve index", srv) + idx, err := srv.Index(c, "") + if err != nil { + errs <- fmt.Errorf("%s: %v", srv, err) + return + } + bal.logf("%s: add %d replicas to map", srv, len(idx)) + bal.BlockStateMap.AddReplicas(srv, idx) + bal.logf("%s: done", srv) + }(srv) + } + + // collQ buffers incoming collections so we can start fetching + // the next page without waiting for the current page to + // finish processing. (1000 happens to match the page size + // used by (*arvados.Client)EachCollection(), but it's OK if + // they don't match.) + collQ := make(chan arvados.Collection, 1000) + + // Start a goroutine to process collections. (We could use a + // worker pool here, but even with a single worker we already + // process collections much faster than we can retrieve them.) + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + err := bal.addCollection(coll) + if err != nil { + errs <- err + for range collQ { + } + return + } + bal.collScanned++ + } + }() + + // Start a goroutine to retrieve all collections from the + // Arvados database and send them to collQ for processing. + wg.Add(1) + go func() { + defer wg.Done() + err = EachCollection(c, + func(coll arvados.Collection) error { + collQ <- coll + if len(errs) > 0 { + // some other GetCurrentState + // error happened: no point + // getting any more + // collections. + return fmt.Errorf("") + } + return nil + }, func(done, total int) { + bal.logf("collections: %d/%d", done, total) + }) + close(collQ) + if err != nil { + errs <- err + } + }() + + go func() { + // Send a nil error when all goroutines finish. If + // this is the first error sent to errs, then + // everything worked. + wg.Wait() + errs <- nil + }() + return <-errs +} + +func (bal *Balancer) addCollection(coll arvados.Collection) error { + blkids, err := coll.SizedDigests() + if err != nil { + bal.mutex.Lock() + bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err)) + bal.mutex.Unlock() + return nil + } + repl := bal.DefaultReplication + if coll.ReplicationDesired != nil { + repl = *coll.ReplicationDesired + } + debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) + bal.BlockStateMap.IncreaseDesired(repl, blkids) + return nil +} + +// ComputeChangeSets compares, for each known block, the current and +// desired replication states. If it is possible to get closer to the +// desired state by copying or deleting blocks, it adds those changes +// to the relevant KeepServices' ChangeSets. +// +// It does not actually apply any of the computed changes. +func (bal *Balancer) ComputeChangeSets() { + // This just calls balanceBlock() once for each block, using a + // pool of worker goroutines. + defer timeMe(bal.Logger, "ComputeChangeSets")() + bal.setupServiceRoots() + + type balanceTask struct { + blkid arvados.SizedDigest + blk *BlockState + } + nWorkers := 1 + runtime.NumCPU() + todo := make(chan balanceTask, nWorkers) + var wg sync.WaitGroup + for i := 0; i < nWorkers; i++ { + wg.Add(1) + go func() { + for work := range todo { + bal.balanceBlock(work.blkid, work.blk) + } + wg.Done() + }() + } + bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { + todo <- balanceTask{ + blkid: blkid, + blk: blk, + } + }) + close(todo) + wg.Wait() +} + +func (bal *Balancer) setupServiceRoots() { + bal.serviceRoots = make(map[string]string) + for _, srv := range bal.KeepServices { + bal.serviceRoots[srv.UUID] = srv.UUID + } +} + +const ( + changeStay = iota + changePull + changeTrash + changeNone +) + +var changeName = map[int]string{ + changeStay: "stay", + changePull: "pull", + changeTrash: "trash", + changeNone: "none", +} + +// balanceBlock compares current state to desired state for a single +// block, and makes the appropriate ChangeSet calls. +func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { + debugf("balanceBlock: %v %+v", blkid, blk) + uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots() + hasRepl := make(map[string]Replica, len(bal.serviceRoots)) + for _, repl := range blk.Replicas { + hasRepl[repl.UUID] = repl + // TODO: when multiple copies are on one server, use + // the oldest one that doesn't have a timestamp + // collision with other replicas. + } + // number of replicas already found in positions better than + // the position we're contemplating now. + reportedBestRepl := 0 + // To be safe we assume two replicas with the same Mtime are + // in fact the same replica being reported more than + // once. len(uniqueBestRepl) is the number of distinct + // replicas in the best rendezvous positions we've considered + // so far. + uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots)) + // pulls is the number of Pull changes we have already + // requested. (For purposes of deciding whether to Pull to + // rendezvous position N, we should assume all pulls we have + // requested on rendezvous positions M= blk.Desired && + !uniqueBestRepl[repl.Mtime] { + srv.AddTrash(Trash{ + SizedDigest: blkid, + Mtime: repl.Mtime, + }) + change = changeTrash + } else { + change = changeStay + } + uniqueBestRepl[repl.Mtime] = true + reportedBestRepl++ + } else if pulls+reportedBestRepl < blk.Desired && + len(blk.Replicas) > 0 && + !srv.ReadOnly { + // This service doesn't have a replica. We + // should pull one to this server if we don't + // already have enough (existing+requested) + // replicas in better rendezvous positions. + srv.AddPull(Pull{ + SizedDigest: blkid, + Source: blk.Replicas[0].KeepService, + }) + pulls++ + change = changePull + } + if bal.Dumper != nil { + changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime)) + } + } + if bal.Dumper != nil { + bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " ")) + } +} + +type blocksNBytes struct { + replicas int + blocks int + bytes int64 +} + +func (bb blocksNBytes) String() string { + return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes) +} + +type balancerStats struct { + lost, overrep, unref, garbage, underrep, justright blocksNBytes + desired, current blocksNBytes + pulls, trashes int +} + +func (bal *Balancer) getStatistics() (s balancerStats) { + bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { + surplus := len(blk.Replicas) - blk.Desired + bytes := blkid.Size() + switch { + case len(blk.Replicas) == 0 && blk.Desired > 0: + s.lost.replicas -= surplus + s.lost.blocks++ + s.lost.bytes += bytes * int64(-surplus) + case len(blk.Replicas) < blk.Desired: + s.underrep.replicas -= surplus + s.underrep.blocks++ + s.underrep.bytes += bytes * int64(-surplus) + case len(blk.Replicas) > 0 && blk.Desired == 0: + counter := &s.garbage + for _, r := range blk.Replicas { + if r.Mtime >= bal.MinMtime { + counter = &s.unref + break + } + } + counter.replicas += surplus + counter.blocks++ + counter.bytes += bytes * int64(surplus) + case len(blk.Replicas) > blk.Desired: + s.overrep.replicas += surplus + s.overrep.blocks++ + s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired) + default: + s.justright.replicas += blk.Desired + s.justright.blocks++ + s.justright.bytes += bytes * int64(blk.Desired) + } + + if blk.Desired > 0 { + s.desired.replicas += blk.Desired + s.desired.blocks++ + s.desired.bytes += bytes * int64(blk.Desired) + } + if len(blk.Replicas) > 0 { + s.current.replicas += len(blk.Replicas) + s.current.blocks++ + s.current.bytes += bytes * int64(len(blk.Replicas)) + } + }) + for _, srv := range bal.KeepServices { + s.pulls += len(srv.ChangeSet.Pulls) + s.trashes += len(srv.ChangeSet.Trashes) + } + return +} + +// PrintStatistics writes statistics about the computed changes to +// bal.Logger. It should not be called until ComputeChangeSets has +// finished. +func (bal *Balancer) PrintStatistics() { + s := bal.getStatistics() + bal.logf("===") + bal.logf("%s lost (0=havewant>0)", s.overrep) + bal.logf("%s unreferenced (have>want=0, new)", s.unref) + bal.logf("%s garbage (have>want=0, old)", s.garbage) + bal.logf("===") + bal.logf("%s total commitment (excluding unreferenced)", s.desired) + bal.logf("%s total usage", s.current) + bal.logf("===") + for _, srv := range bal.KeepServices { + bal.logf("%s: %v\n", srv, srv.ChangeSet) + } + bal.logf("===") +} + +// CheckSanityLate checks for configuration and runtime errors after +// GetCurrentState() and ComputeChangeSets() have finished. +// +// If it returns an error, it is dangerous to run any Commit methods. +func (bal *Balancer) CheckSanityLate() error { + if bal.errors != nil { + for _, err := range bal.errors { + bal.logf("deferred error: %v", err) + } + return fmt.Errorf("cannot proceed safely after deferred errors") + } + + if bal.collScanned == 0 { + return fmt.Errorf("received zero collections") + } + + anyDesired := false + bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) { + if blk.Desired > 0 { + anyDesired = true + } + }) + if !anyDesired { + return fmt.Errorf("zero blocks have desired replication>0") + } + + if dr := bal.DefaultReplication; dr < 1 { + return fmt.Errorf("Default replication (%d) is less than 1", dr) + } + + // TODO: no two services have identical indexes + // TODO: no collisions (same md5, different size) + return nil +} + +// CommitPulls sends the computed lists of pull requests to the +// keepstore servers. This has the effect of increasing replication of +// existing blocks that are either underreplicated or poorly +// distributed according to rendezvous hashing. +func (bal *Balancer) CommitPulls(c *arvados.Client) error { + return bal.commitAsync(c, "send pull list", + func(srv *KeepService) error { + return srv.CommitPulls(c) + }) +} + +// CommitTrash sends the computed lists of trash requests to the +// keepstore servers. This has the effect of deleting blocks that are +// overreplicated or unreferenced. +func (bal *Balancer) CommitTrash(c *arvados.Client) error { + return bal.commitAsync(c, "send trash list", + func(srv *KeepService) error { + return srv.CommitTrash(c) + }) +} + +func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error { + errs := make(chan error) + for _, srv := range bal.KeepServices { + go func(srv *KeepService) { + var err error + defer func() { errs <- err }() + label := fmt.Sprintf("%s: %v", srv, label) + defer timeMe(bal.Logger, label)() + err = f(srv) + if err != nil { + err = fmt.Errorf("%s: %v", label, err) + } + }(srv) + } + var lastErr error + for _ = range bal.KeepServices { + if err := <-errs; err != nil { + bal.logf("%v", err) + lastErr = err + } + } + close(errs) + return lastErr +} + +func (bal *Balancer) logf(f string, args ...interface{}) { + if bal.Logger != nil { + bal.Logger.Printf(f, args...) + } +} diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go new file mode 100644 index 0000000000..a138d911a3 --- /dev/null +++ b/services/keep-balance/balance_run_test.go @@ -0,0 +1,374 @@ +package main + +import ( + _ "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&runSuite{}) + +type reqTracker struct { + reqs []http.Request + sync.Mutex +} + +func (rt *reqTracker) Count() int { + rt.Lock() + defer rt.Unlock() + return len(rt.reqs) +} + +func (rt *reqTracker) Add(req *http.Request) int { + rt.Lock() + defer rt.Unlock() + rt.reqs = append(rt.reqs, *req) + return len(rt.reqs) +} + +// stubServer is an HTTP transport that intercepts and processes all +// requests using its own handlers. +type stubServer struct { + mux *http.ServeMux + srv *httptest.Server + mutex sync.Mutex + Requests reqTracker + logf func(string, ...interface{}) +} + +// Start initializes the stub server and returns an *http.Client that +// uses the stub server to handle all requests. +// +// A stubServer that has been started should eventually be shut down +// with Close(). +func (s *stubServer) Start() *http.Client { + // Set up a config.Client that forwards all requests to s.mux + // via s.srv. Test cases will attach handlers to s.mux to get + // the desired responses. + s.mux = http.NewServeMux() + s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.mutex.Lock() + s.Requests.Add(r) + s.mutex.Unlock() + w.Header().Set("Content-Type", "application/json") + s.mux.ServeHTTP(w, r) + })) + return &http.Client{Transport: s} +} + +func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) { + w := httptest.NewRecorder() + s.mux.ServeHTTP(w, req) + return &http.Response{ + StatusCode: w.Code, + Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)), + Header: w.HeaderMap, + Body: ioutil.NopCloser(w.Body)}, nil +} + +// Close releases resources used by the server. +func (s *stubServer) Close() { + s.srv.Close() +} + +func (s *stubServer) serveStatic(path, data string) *reqTracker { + rt := &reqTracker{} + s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + rt.Add(r) + if r.Body != nil { + ioutil.ReadAll(r.Body) + r.Body.Close() + } + io.WriteString(w, data) + }) + return rt +} + +func (s *stubServer) serveCurrentUserAdmin() *reqTracker { + return s.serveStatic("/arvados/v1/users/current", + `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`) +} + +func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker { + return s.serveStatic("/arvados/v1/users/current", + `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`) +} + +func (s *stubServer) serveDiscoveryDoc() *reqTracker { + return s.serveStatic("/discovery/v1/apis/arvados/v1/rest", + `{"defaultCollectionReplication":2}`) +} + +func (s *stubServer) serveZeroCollections() *reqTracker { + return s.serveStatic("/arvados/v1/collections", + `{"items":[],"items_available":0}`) +} + +func (s *stubServer) serveFooBarFileCollections() *reqTracker { + rt := &reqTracker{} + s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + rt.Add(r) + if strings.Contains(r.Form.Get("filters"), `modified_at`) { + io.WriteString(w, `{"items_available":0,"items":[]}`) + } else { + io.WriteString(w, `{"items_available":2,"items":[ + {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"}, + {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`) + } + }) + return rt +} + +func (s *stubServer) serveCollectionsButSkipOne() *reqTracker { + rt := &reqTracker{} + s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + rt.Add(r) + if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) { + io.WriteString(w, `{"items_available":3,"items":[]}`) + } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) { + io.WriteString(w, `{"items_available":0,"items":[]}`) + } else { + io.WriteString(w, `{"items_available":2,"items":[ + {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"}, + {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`) + } + }) + return rt +} + +func (s *stubServer) serveZeroKeepServices() *reqTracker { + return s.serveStatic("/arvados/v1/keep_services", + `{"items":[],"items_available":0}`) +} + +func (s *stubServer) serveFourDiskKeepServices() *reqTracker { + return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[ + {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"}, + {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"}, + {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"}, + {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"}, + {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`) +} + +func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker { + rt := &reqTracker{} + s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) { + count := rt.Add(r) + if r.Host == "keep0.zzzzz.arvadosapi.com:25107" { + io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n") + } + fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count) + }) + return rt +} + +func (s *stubServer) serveKeepstoreTrash() *reqTracker { + return s.serveStatic("/trash", `{}`) +} + +func (s *stubServer) serveKeepstorePull() *reqTracker { + return s.serveStatic("/pull", `{}`) +} + +type runSuite struct { + stub stubServer + config Config +} + +// make a log.Logger that writes to the current test's c.Log(). +func (s *runSuite) logger(c *check.C) *log.Logger { + r, w := io.Pipe() + go func() { + buf := make([]byte, 10000) + for { + n, err := r.Read(buf) + if n > 0 { + if buf[n-1] == '\n' { + n-- + } + c.Log(string(buf[:n])) + } + if err != nil { + break + } + } + }() + return log.New(w, "", log.LstdFlags) +} + +func (s *runSuite) SetUpTest(c *check.C) { + s.config = Config{ + Client: arvados.Client{ + AuthToken: "xyzzy", + APIHost: "zzzzz.arvadosapi.com", + Client: s.stub.Start()}, + KeepServiceTypes: []string{"disk"}} + s.stub.serveDiscoveryDoc() + s.stub.logf = c.Logf +} + +func (s *runSuite) TearDownTest(c *check.C) { + s.stub.Close() +} + +func (s *runSuite) TestRefuseZeroCollections(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveZeroCollections() + s.stub.serveFourDiskKeepServices() + s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + err := (&Balancer{}).Run(s.config, opts) + c.Check(err, check.ErrorMatches, "received zero collections") + c.Check(trashReqs.Count(), check.Equals, 4) + c.Check(pullReqs.Count(), check.Equals, 0) +} + +func (s *runSuite) TestServiceTypes(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + } + s.config.KeepServiceTypes = []string{"unlisted-type"} + s.stub.serveCurrentUserAdmin() + s.stub.serveFooBarFileCollections() + s.stub.serveFourDiskKeepServices() + indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + err := (&Balancer{}).Run(s.config, opts) + c.Check(err, check.IsNil) + c.Check(indexReqs.Count(), check.Equals, 0) + c.Check(trashReqs.Count(), check.Equals, 0) +} + +func (s *runSuite) TestRefuseNonAdmin(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + } + s.stub.serveCurrentUserNotAdmin() + s.stub.serveZeroCollections() + s.stub.serveFourDiskKeepServices() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + err := (&Balancer{}).Run(s.config, opts) + c.Check(err, check.ErrorMatches, "current user .* is not .* admin user") + c.Check(trashReqs.Count(), check.Equals, 0) + c.Check(pullReqs.Count(), check.Equals, 0) +} + +func (s *runSuite) TestDetectSkippedCollections(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveCollectionsButSkipOne() + s.stub.serveFourDiskKeepServices() + s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + err := (&Balancer{}).Run(s.config, opts) + c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`) + c.Check(trashReqs.Count(), check.Equals, 4) + c.Check(pullReqs.Count(), check.Equals, 0) +} + +func (s *runSuite) TestDryRun(c *check.C) { + opts := RunOptions{ + CommitPulls: false, + CommitTrash: false, + Logger: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveFooBarFileCollections() + s.stub.serveFourDiskKeepServices() + s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + var bal Balancer + err := bal.Run(s.config, opts) + c.Check(err, check.IsNil) + c.Check(trashReqs.Count(), check.Equals, 0) + c.Check(pullReqs.Count(), check.Equals, 0) + stats := bal.getStatistics() + c.Check(stats.pulls, check.Not(check.Equals), 0) + c.Check(stats.underrep.replicas, check.Not(check.Equals), 0) + c.Check(stats.overrep.replicas, check.Not(check.Equals), 0) +} + +func (s *runSuite) TestCommit(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + Dumper: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveFooBarFileCollections() + s.stub.serveFourDiskKeepServices() + s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + var bal Balancer + err := bal.Run(s.config, opts) + c.Check(err, check.IsNil) + c.Check(trashReqs.Count(), check.Equals, 8) + c.Check(pullReqs.Count(), check.Equals, 4) + stats := bal.getStatistics() + // "foo" block is overreplicated by 2 + c.Check(stats.trashes, check.Equals, 2) + // "bar" block is underreplicated by 1, and its only copy is + // in a poor rendezvous position + c.Check(stats.pulls, check.Equals, 2) +} + +func (s *runSuite) TestRunForever(c *check.C) { + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + Dumper: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveFooBarFileCollections() + s.stub.serveFourDiskKeepServices() + s.stub.serveKeepstoreIndexFoo4Bar1() + trashReqs := s.stub.serveKeepstoreTrash() + pullReqs := s.stub.serveKeepstorePull() + + stop := make(chan interface{}) + s.config.RunPeriod = arvados.Duration(time.Millisecond) + go RunForever(s.config, opts, stop) + + // Each run should send 4 clear trash lists + 4 pull lists + 4 + // trash lists. We should complete four runs in much less than + // a second. + for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; { + time.Sleep(time.Millisecond) + } + stop <- true + c.Check(pullReqs.Count() >= 16, check.Equals, true) + c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count()) +} diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go new file mode 100644 index 0000000000..682a5fb070 --- /dev/null +++ b/services/keep-balance/balance_test.go @@ -0,0 +1,255 @@ +package main + +import ( + "crypto/md5" + "fmt" + "sort" + "strconv" + "testing" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + + check "gopkg.in/check.v1" +) + +// Test with Gocheck +func Test(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&balancerSuite{}) + +type balancerSuite struct { + Balancer + srvs []*KeepService + blks map[string]tester + knownRendezvous [][]int + signatureTTL int64 +} + +const ( + // index into knownRendezvous + known0 = 0 +) + +type slots []int + +type tester struct { + known int + desired int + current slots + timestamps []int64 + shouldPull slots + shouldTrash slots +} + +func (bal *balancerSuite) SetUpSuite(c *check.C) { + bal.knownRendezvous = nil + for _, str := range []string{ + "3eab2d5fc9681074", + "097dba52e648f1c3", + "c5b4e023f8a7d691", + "9d81c02e76a3bf54", + } { + var slots []int + for _, c := range []byte(str) { + pos, _ := strconv.ParseUint(string(c), 16, 4) + slots = append(slots, int(pos)) + } + bal.knownRendezvous = append(bal.knownRendezvous, slots) + } + + bal.signatureTTL = 3600 +} + +func (bal *balancerSuite) SetUpTest(c *check.C) { + bal.srvs = make([]*KeepService, 16) + bal.KeepServices = make(map[string]*KeepService) + for i := range bal.srvs { + srv := &KeepService{ + KeepService: arvados.KeepService{ + UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i), + }, + } + bal.srvs[i] = srv + bal.KeepServices[srv.UUID] = srv + } + + bal.MinMtime = time.Now().Unix() - bal.signatureTTL +} + +func (bal *balancerSuite) TestPerfect(c *check.C) { + bal.try(c, tester{ + desired: 2, + current: slots{0, 1}, + shouldPull: nil, + shouldTrash: nil}) +} + +func (bal *balancerSuite) TestDecreaseRepl(c *check.C) { + bal.try(c, tester{ + desired: 2, + current: slots{0, 2, 1}, + shouldTrash: slots{2}}) +} + +func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) { + bal.try(c, tester{ + desired: 0, + current: slots{0, 1, 3}, + shouldTrash: slots{0, 1, 3}}) +} + +func (bal *balancerSuite) TestIncreaseRepl(c *check.C) { + bal.try(c, tester{ + desired: 4, + current: slots{0, 1}, + shouldPull: slots{2, 3}}) +} + +func (bal *balancerSuite) TestSkipReadonly(c *check.C) { + bal.srvList(0, slots{3})[0].ReadOnly = true + bal.try(c, tester{ + desired: 4, + current: slots{0, 1}, + shouldPull: slots{2, 4}}) +} + +func (bal *balancerSuite) TestFixUnbalanced(c *check.C) { + bal.try(c, tester{ + desired: 2, + current: slots{2, 0}, + shouldPull: slots{1}}) + bal.try(c, tester{ + desired: 2, + current: slots{2, 7}, + shouldPull: slots{0, 1}}) + // if only one of the pulls succeeds, we'll see this next: + bal.try(c, tester{ + desired: 2, + current: slots{2, 1, 7}, + shouldPull: slots{0}, + shouldTrash: slots{7}}) + // if both pulls succeed, we'll see this next: + bal.try(c, tester{ + desired: 2, + current: slots{2, 0, 1, 7}, + shouldTrash: slots{2, 7}}) + + // unbalanced + excessive replication => pull + trash + bal.try(c, tester{ + desired: 2, + current: slots{2, 5, 7}, + shouldPull: slots{0, 1}, + shouldTrash: slots{7}}) +} + +func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) { + // For purposes of increasing replication, we assume identical + // replicas are distinct. + bal.try(c, tester{ + desired: 4, + current: slots{0, 1}, + timestamps: []int64{12345678, 12345678}, + shouldPull: slots{2, 3}}) +} + +func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) { + // For purposes of decreasing replication, we assume identical + // replicas are NOT distinct. + bal.try(c, tester{ + desired: 2, + current: slots{0, 1, 2}, + timestamps: []int64{12345678, 12345678, 12345678}}) + bal.try(c, tester{ + desired: 2, + current: slots{0, 1, 2}, + timestamps: []int64{12345678, 10000000, 10000000}}) +} + +func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) { + oldTime := bal.MinMtime - 3600 + newTime := bal.MinMtime + 3600 + // The excess replica is too new to delete. + bal.try(c, tester{ + desired: 2, + current: slots{0, 1, 2}, + timestamps: []int64{oldTime, newTime, newTime + 1}}) + // The best replicas are too new to delete, but the excess + // replica is old enough. + bal.try(c, tester{ + desired: 2, + current: slots{0, 1, 2}, + timestamps: []int64{newTime, newTime + 1, oldTime}, + shouldTrash: slots{2}}) +} + +// Clear all servers' changesets, balance a single block, and verify +// the appropriate changes for that block have been added to the +// changesets. +func (bal *balancerSuite) try(c *check.C, t tester) { + bal.setupServiceRoots() + blk := &BlockState{ + Desired: t.desired, + Replicas: bal.replList(t.known, t.current)} + for i, t := range t.timestamps { + blk.Replicas[i].Mtime = t + } + for _, srv := range bal.srvs { + srv.ChangeSet = &ChangeSet{} + } + bal.balanceBlock(knownBlkid(t.known), blk) + + var didPull, didTrash slots + for i, srv := range bal.srvs { + var slot int + for probeOrder, srvNum := range bal.knownRendezvous[t.known] { + if srvNum == i { + slot = probeOrder + } + } + for _, pull := range srv.Pulls { + didPull = append(didPull, slot) + c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known)) + } + for _, trash := range srv.Trashes { + didTrash = append(didTrash, slot) + c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known)) + } + } + + for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} { + sort.Sort(sort.IntSlice(list)) + } + c.Check(didPull, check.DeepEquals, t.shouldPull) + c.Check(didTrash, check.DeepEquals, t.shouldTrash) +} + +// srvList returns the KeepServices, sorted in rendezvous order and +// then selected by idx. For example, srvList(3, 0, 1, 4) returns the +// the first-, second-, and fifth-best servers for storing +// bal.knownBlkid(3). +func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) { + for _, i := range order { + srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]]) + } + return +} + +// replList is like srvList but returns an "existing replicas" slice, +// suitable for a BlockState test fixture. +func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) { + mtime := time.Now().Unix() - bal.signatureTTL - 86400 + for _, srv := range bal.srvList(knownBlockID, order) { + repls = append(repls, Replica{srv, mtime}) + mtime++ + } + return +} + +// generate the same data hashes that are tested in +// sdk/go/keepclient/root_sorter_test.go +func knownBlkid(i int) arvados.SizedDigest { + return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i))))) +} diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go new file mode 100644 index 0000000000..d60738613d --- /dev/null +++ b/services/keep-balance/block_state.go @@ -0,0 +1,95 @@ +package main + +import ( + "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// Replica is a file on disk (or object in an S3 bucket, or blob in an +// Azure storage container, etc.) as reported in a keepstore index +// response. +type Replica struct { + *KeepService + Mtime int64 +} + +// BlockState indicates the number of desired replicas (according to +// the collections we know about) and the replicas actually stored +// (according to the keepstore indexes we know about). +type BlockState struct { + Replicas []Replica + Desired int +} + +func (bs *BlockState) addReplica(r Replica) { + bs.Replicas = append(bs.Replicas, r) +} + +func (bs *BlockState) increaseDesired(n int) { + if bs.Desired < n { + bs.Desired = n + } +} + +// BlockStateMap is a goroutine-safe wrapper around a +// map[arvados.SizedDigest]*BlockState. +type BlockStateMap struct { + entries map[arvados.SizedDigest]*BlockState + mutex sync.Mutex +} + +// NewBlockStateMap returns a newly allocated BlockStateMap. +func NewBlockStateMap() *BlockStateMap { + return &BlockStateMap{ + entries: make(map[arvados.SizedDigest]*BlockState), + } +} + +// return a BlockState entry, allocating a new one if needed. (Private +// method: not goroutine-safe.) +func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState { + // TODO? Allocate BlockState structs a slice at a time, + // instead of one at a time. + blk := bsm.entries[blkid] + if blk == nil { + blk = &BlockState{} + bsm.entries[blkid] = blk + } + return blk +} + +// Apply runs f on each entry in the map. +func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) { + bsm.mutex.Lock() + defer bsm.mutex.Unlock() + + for blkid, blk := range bsm.entries { + f(blkid, blk) + } +} + +// AddReplicas updates the map to indicate srv has a replica of each +// block in idx. +func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) { + bsm.mutex.Lock() + defer bsm.mutex.Unlock() + + for _, ent := range idx { + bsm.get(ent.SizedDigest).addReplica(Replica{ + KeepService: srv, + Mtime: ent.Mtime, + }) + } +} + +// IncreaseDesired updates the map to indicate the desired replication +// for the given blocks is at least n. +func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) { + bsm.mutex.Lock() + defer bsm.mutex.Unlock() + + for _, blkid := range blocks { + bsm.get(blkid).increaseDesired(n) + } +} diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go new file mode 100644 index 0000000000..417ea7ff8b --- /dev/null +++ b/services/keep-balance/change_set.go @@ -0,0 +1,75 @@ +package main + +import ( + "encoding/json" + "fmt" + "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// Pull is a request to retrieve a block from a remote server, and +// store it locally. +type Pull struct { + arvados.SizedDigest + Source *KeepService +} + +// MarshalJSON formats a pull request the way keepstore wants to see +// it. +func (p Pull) MarshalJSON() ([]byte, error) { + type KeepstorePullRequest struct { + Locator string `json:"locator"` + Servers []string `json:"servers"` + } + return json.Marshal(KeepstorePullRequest{ + Locator: string(p.SizedDigest[:32]), + Servers: []string{p.Source.URLBase()}}) +} + +// Trash is a request to delete a block. +type Trash struct { + arvados.SizedDigest + Mtime int64 +} + +// MarshalJSON formats a trash request the way keepstore wants to see +// it, i.e., as a bare locator with no +size hint. +func (t Trash) MarshalJSON() ([]byte, error) { + type KeepstoreTrashRequest struct { + Locator string `json:"locator"` + BlockMtime int64 `json:"block_mtime"` + } + return json.Marshal(KeepstoreTrashRequest{ + Locator: string(t.SizedDigest[:32]), + BlockMtime: t.Mtime}) +} + +// ChangeSet is a set of change requests that will be sent to a +// keepstore server. +type ChangeSet struct { + Pulls []Pull + Trashes []Trash + mutex sync.Mutex +} + +// AddPull adds a Pull operation. +func (cs *ChangeSet) AddPull(p Pull) { + cs.mutex.Lock() + cs.Pulls = append(cs.Pulls, p) + cs.mutex.Unlock() +} + +// AddTrash adds a Trash operation +func (cs *ChangeSet) AddTrash(t Trash) { + cs.mutex.Lock() + cs.Trashes = append(cs.Trashes, t) + cs.mutex.Unlock() +} + +// String implements fmt.Stringer. +func (cs *ChangeSet) String() string { + cs.mutex.Lock() + defer cs.mutex.Unlock() + return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes)) +} diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go new file mode 100644 index 0000000000..b5dcb5c003 --- /dev/null +++ b/services/keep-balance/change_set_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "encoding/json" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&changeSetSuite{}) + +type changeSetSuite struct{} + +func (s *changeSetSuite) TestJSONFormat(c *check.C) { + srv := &KeepService{ + KeepService: arvados.KeepService{ + UUID: "zzzzz-bi6l4-000000000000001", + ServiceType: "disk", + ServiceSSLFlag: false, + ServiceHost: "keep1.zzzzz.arvadosapi.com", + ServicePort: 25107}} + + buf, err := json.Marshal([]Pull{{ + SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"), + Source: srv}}) + c.Check(err, check.IsNil) + c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`) + + buf, err = json.Marshal([]Trash{{ + SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"), + Mtime: 123456789}}) + c.Check(err, check.IsNil) + c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`) +} diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go new file mode 100644 index 0000000000..e6a1f08cf1 --- /dev/null +++ b/services/keep-balance/collection.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) { + var page arvados.CollectionList + var zero int + params.Limit = &zero + err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params) + return page.ItemsAvailable, err +} + +// EachCollection calls f once for every readable +// collection. EachCollection stops if it encounters an error, such as +// f returning a non-nil error. +// +// The progress function is called periodically with done (number of +// times f has been called) and total (number of times f is expected +// to be called). +func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error { + if progress == nil { + progress = func(_, _ int) {} + } + + expectCount, err := countCollections(c, arvados.ResourceListParams{}) + if err != nil { + return err + } + + limit := 1000 + params := arvados.ResourceListParams{ + Limit: &limit, + Order: "modified_at, uuid", + Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"}, + } + var last arvados.Collection + var filterTime time.Time + callCount := 0 + for { + progress(callCount, expectCount) + var page arvados.CollectionList + err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params) + if err != nil { + return err + } + for _, coll := range page.Items { + if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID { + continue + } + callCount++ + err = f(coll) + if err != nil { + return err + } + last = coll + } + if last.ModifiedAt == nil || *last.ModifiedAt == filterTime { + if page.ItemsAvailable > len(page.Items) { + // TODO: use "mtime=X && UUID>Y" + // filters to get all collections with + // this timestamp, then use "mtime>X" + // to get the next timestamp. + return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime) + } + break + } + filterTime = *last.ModifiedAt + params.Filters = []arvados.Filter{{ + Attr: "modified_at", + Operator: ">=", + Operand: filterTime, + }, { + Attr: "uuid", + Operator: "!=", + Operand: last.UUID, + }} + } + progress(callCount, expectCount) + + if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{ + Attr: "modified_at", + Operator: "<=", + Operand: filterTime}}}); err != nil { + return err + } else if callCount < checkCount { + return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount) + } + + return nil +} diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go new file mode 100644 index 0000000000..b090614607 --- /dev/null +++ b/services/keep-balance/integration_test.go @@ -0,0 +1,92 @@ +package main + +import ( + "bytes" + "log" + "net/http" + "os" + "strings" + "testing" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&integrationSuite{}) + +type integrationSuite struct { + config Config + keepClient *keepclient.KeepClient +} + +func (s *integrationSuite) SetUpSuite(c *check.C) { + if testing.Short() { + c.Skip("-short") + } + arvadostest.ResetEnv() + arvadostest.StartAPI() + arvadostest.StartKeep(4, true) + + arv, err := arvadosclient.MakeArvadosClient() + arv.ApiToken = arvadostest.DataManagerToken + c.Assert(err, check.IsNil) + s.keepClient = &keepclient.KeepClient{ + Arvados: &arv, + Client: &http.Client{}, + } + c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil) + s.putReplicas(c, "foo", 4) + s.putReplicas(c, "bar", 1) +} + +func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) { + s.keepClient.Want_replicas = replicas + _, _, err := s.keepClient.PutB([]byte(data)) + c.Assert(err, check.IsNil) +} + +func (s *integrationSuite) TearDownSuite(c *check.C) { + if testing.Short() { + c.Skip("-short") + } + arvadostest.StopKeep(4) + arvadostest.StopAPI() +} + +func (s *integrationSuite) SetUpTest(c *check.C) { + s.config = Config{ + Client: arvados.Client{ + APIHost: os.Getenv("ARVADOS_API_HOST"), + AuthToken: arvadostest.DataManagerToken, + Insecure: true, + }, + KeepServiceTypes: []string{"disk"}, + } +} + +func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) { + var logBuf *bytes.Buffer + for iter := 0; iter < 20; iter++ { + logBuf := &bytes.Buffer{} + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: log.New(logBuf, "", log.LstdFlags), + } + err := (&Balancer{}).Run(s.config, opts) + c.Check(err, check.IsNil) + if iter == 0 { + c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`) + c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`) + } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") { + break + } + time.Sleep(200 * time.Millisecond) + } + c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`) +} diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go new file mode 100644 index 0000000000..f65355d0d5 --- /dev/null +++ b/services/keep-balance/keep_service.go @@ -0,0 +1,76 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// KeepService represents a keepstore server that is being rebalanced. +type KeepService struct { + arvados.KeepService + *ChangeSet +} + +// String implements fmt.Stringer. +func (srv *KeepService) String() string { + return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType) +} + +var ksSchemes = map[bool]string{false: "http", true: "https"} + +// URLBase returns scheme://host:port for this server. +func (srv *KeepService) URLBase() string { + return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort) +} + +// CommitPulls sends the current list of pull requests to the storage +// server (even if the list is empty). +func (srv *KeepService) CommitPulls(c *arvados.Client) error { + return srv.put(c, "pull", srv.ChangeSet.Pulls) +} + +// CommitTrash sends the current list of trash requests to the storage +// server (even if the list is empty). +func (srv *KeepService) CommitTrash(c *arvados.Client) error { + return srv.put(c, "trash", srv.ChangeSet.Trashes) +} + +// Perform a PUT request at path, with data (as JSON) in the request +// body. +func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error { + // We'll start a goroutine to do the JSON encoding, so we can + // stream it to the http client through a Pipe, rather than + // keeping the entire encoded version in memory. + jsonR, jsonW := io.Pipe() + + // errC communicates any encoding errors back to our main + // goroutine. + errC := make(chan error, 1) + + go func() { + enc := json.NewEncoder(jsonW) + errC <- enc.Encode(data) + jsonW.Close() + }() + + url := srv.URLBase() + "/" + path + req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR)) + if err != nil { + return fmt.Errorf("building request for %s: %v", url, err) + } + err = c.DoAndDecode(nil, req) + + // If there was an error encoding the request body, report + // that instead of the response: obviously we won't get a + // useful response if our request wasn't properly encoded. + if encErr := <-errC; encErr != nil { + return fmt.Errorf("encoding data for %s: %v", url, encErr) + } + + return err +} diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go new file mode 100644 index 0000000000..42a8d635b1 --- /dev/null +++ b/services/keep-balance/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "encoding/json" + "flag" + "io/ioutil" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// Config specifies site configuration, like API credentials and the +// choice of which servers are to be balanced. +// +// Config is loaded from a JSON config file (see usage()). +type Config struct { + // Arvados API endpoint and credentials. + Client arvados.Client + + // List of service types (e.g., "disk") to balance. + KeepServiceTypes []string + + KeepServiceList arvados.KeepServiceList + + // How often to check + RunPeriod arvados.Duration +} + +// RunOptions controls runtime behavior. The flags/options that belong +// here are the ones that are useful for interactive use. For example, +// "CommitTrash" is a runtime option rather than a config item because +// it invokes a troubleshooting feature rather than expressing how +// balancing is meant to be done at a given site. +// +// RunOptions fields are controlled by command line flags. +type RunOptions struct { + Once bool + CommitPulls bool + CommitTrash bool + Logger *log.Logger + Dumper *log.Logger +} + +var debugf = func(string, ...interface{}) {} + +func main() { + var config Config + var runOptions RunOptions + + configPath := flag.String("config", "", + "`path` of json configuration file") + serviceListPath := flag.String("config.KeepServiceList", "", + "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+ + "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])") + flag.BoolVar(&runOptions.Once, "once", false, + "balance once and then exit") + flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false, + "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)") + flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false, + "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)") + dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout") + debugFlag := flag.Bool("debug", false, "enable debug messages") + flag.Usage = usage + flag.Parse() + + if *configPath == "" { + log.Fatal("You must specify a config file (see `keep-balance -help`)") + } + mustReadJSON(&config, *configPath) + if *serviceListPath != "" { + mustReadJSON(&config.KeepServiceList, *serviceListPath) + } + + if *debugFlag { + debugf = log.Printf + if j, err := json.Marshal(config); err != nil { + log.Fatal(err) + } else { + log.Printf("config is %s", j) + } + } + if *dumpFlag { + runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags) + } + err := CheckConfig(config, runOptions) + if err != nil { + // (don't run) + } else if runOptions.Once { + err = (&Balancer{}).Run(config, runOptions) + } else { + err = RunForever(config, runOptions, nil) + } + if err != nil { + log.Fatal(err) + } +} + +func mustReadJSON(dst interface{}, path string) { + if buf, err := ioutil.ReadFile(path); err != nil { + log.Fatalf("Reading %q: %v", path, err) + } else if err = json.Unmarshal(buf, dst); err != nil { + log.Fatalf("Decoding %q: %v", path, err) + } +} + +// RunForever runs forever, or (for testing purposes) until the given +// stop channel is ready to receive. +func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error { + if runOptions.Logger == nil { + runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + logger := runOptions.Logger + + ticker := time.NewTicker(time.Duration(config.RunPeriod)) + + // The unbuffered channel here means we only hear SIGUSR1 if + // it arrives while we're waiting in select{}. + sigUSR1 := make(chan os.Signal) + signal.Notify(sigUSR1, syscall.SIGUSR1) + + logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod) + + for { + if !runOptions.CommitPulls && !runOptions.CommitTrash { + logger.Print("WARNING: Will scan periodically, but no changes will be committed.") + logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") + } + + err := (&Balancer{}).Run(config, runOptions) + if err != nil { + logger.Print("run failed: ", err) + } else { + logger.Print("run succeeded") + } + + select { + case <-stop: + signal.Stop(sigUSR1) + return nil + case <-ticker.C: + logger.Print("timer went off") + case <-sigUSR1: + logger.Print("received SIGUSR1, resetting timer") + // Reset the timer so we don't start the N+1st + // run too soon after the Nth run is triggered + // by SIGUSR1. + ticker.Stop() + ticker = time.NewTicker(time.Duration(config.RunPeriod)) + } + logger.Print("starting next run") + } +} diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go new file mode 100644 index 0000000000..4a56098b29 --- /dev/null +++ b/services/keep-balance/main_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" + "time" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&mainSuite{}) + +type mainSuite struct{} + +func (s *mainSuite) TestExampleJSON(c *check.C) { + var config Config + c.Check(json.Unmarshal(exampleConfigFile, &config), check.IsNil) + c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"}) + c.Check(config.Client.AuthToken, check.Equals, "xyzzy") + c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second) +} + +func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) { + var config Config + c.Check(json.Unmarshal([]byte(` + { + "Client": { + "APIHost": "zzzzz.arvadosapi.com:443", + "AuthToken": "xyzzy", + "Insecure": false + }, + "KeepServiceList": { + "items": [ + {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345}, + {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345} + ] + }, + "RunPeriod": "600s" + }`), &config), check.IsNil) + c.Assert(len(config.KeepServiceList.Items), check.Equals, 2) + c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno") + c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345) + c.Check(config.Client.AuthToken, check.Equals, "xyzzy") +} diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go new file mode 100644 index 0000000000..e5f16b7696 --- /dev/null +++ b/services/keep-balance/time_me.go @@ -0,0 +1,14 @@ +package main + +import ( + "log" + "time" +) + +func timeMe(logger *log.Logger, label string) func() { + t0 := time.Now() + logger.Printf("%s: start", label) + return func() { + logger.Printf("%s: took %v", label, time.Since(t0)) + } +} diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go new file mode 100644 index 0000000000..eb9990c973 --- /dev/null +++ b/services/keep-balance/usage.go @@ -0,0 +1,83 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +var exampleConfigFile = []byte(` + { + "Client": { + "APIHost": "zzzzz.arvadosapi.com:443", + "AuthToken": "xyzzy", + "Insecure": false + }, + "KeepServiceTypes": [ + "disk" + ], + "RunPeriod": "600s" + }`) + +func usage() { + fmt.Fprintf(os.Stderr, ` + +keep-balance rebalances a set of keepstore servers. It creates new +copies of underreplicated blocks, deletes excess copies of +overreplicated and unreferenced blocks, and moves blocks to better +positions (according to the rendezvous hash algorithm) so clients find +them faster. + +Usage: keep-balance -config path/to/config.json [options] + +Options: +`) + flag.PrintDefaults() + fmt.Fprintf(os.Stderr, ` +Example config file: +%s + + Client.AuthToken must be recognized by Arvados as an admin token, + and must be recognized by all Keep services as a "data manager + key". + + Client.Insecure should be true if your Arvados API endpoint uses + an unverifiable SSL/TLS certificate. + +Periodic scanning: + + By default, keep-balance operates periodically, i.e.: do a + scan/balance operation, sleep, repeat. + + RunPeriod determines the interval between start times of + successive scan/balance operations. If a scan/balance operation + takes longer than RunPeriod, the next one will follow it + immediately. + + If SIGUSR1 is received during an idle period between operations, + the next operation will start immediately. + +One-time scanning: + + Use the -once flag to do a single operation and then exit. The + exit code will be zero if the operation was successful. + +Committing: + + By default, keep-service computes and reports changes but does not + implement them by sending pull and trash lists to the Keep + services. + + Use the -commit-pull and -commit-trash flags to implement the + computed changes. + +Limitations: + + keep-balance does not attempt to discover whether committed pull + and trash requests ever get carried out -- only that they are + accepted by the Keep services. If some services are full, new + copies of underreplicated blocks might never get made, only + repeatedly requested. + +`, exampleConfigFile) +} diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 80d8670105..d7da67c348 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/httpserver" "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" @@ -331,8 +332,12 @@ func main() { } // Initialize Pull queue and worker + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + log.Fatalf("MakeArvadosClient: %s", err) + } keepClient := &keepclient.KeepClient{ - Arvados: nil, + Arvados: &arv, Want_replicas: 1, Client: &http.Client{}, } -- 2.30.2