9162: Add keep-balance
authorTom Clegg <tom@curoverse.com>
Mon, 16 May 2016 21:09:21 +0000 (17:09 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 31 May 2016 20:22:57 +0000 (16:22 -0400)
33 files changed:
build/package-build-dockerfiles/Makefile
build/package-build-dockerfiles/centos6/Dockerfile
build/package-build-dockerfiles/debian7/Dockerfile
build/package-build-dockerfiles/debian8/Dockerfile
build/package-build-dockerfiles/ubuntu1204/Dockerfile
build/package-build-dockerfiles/ubuntu1404/Dockerfile
build/run-build-packages-one-target.sh
build/run-build-packages.sh
build/run-tests.sh
sdk/go/arvados/client.go [new file with mode: 0644]
sdk/go/arvados/client_test.go [new file with mode: 0644]
sdk/go/arvados/collection.go [new file with mode: 0644]
sdk/go/arvados/doc.go [new file with mode: 0644]
sdk/go/arvados/duration.go [new file with mode: 0644]
sdk/go/arvados/keep_block.go [new file with mode: 0644]
sdk/go/arvados/keep_service.go [new file with mode: 0644]
sdk/go/arvados/resource_list.go [new file with mode: 0644]
sdk/go/arvados/resource_list_test.go [new file with mode: 0644]
sdk/go/arvados/user.go [new file with mode: 0644]
services/keep-balance/balance.go [new file with mode: 0644]
services/keep-balance/balance_run_test.go [new file with mode: 0644]
services/keep-balance/balance_test.go [new file with mode: 0644]
services/keep-balance/block_state.go [new file with mode: 0644]
services/keep-balance/change_set.go [new file with mode: 0644]
services/keep-balance/change_set_test.go [new file with mode: 0644]
services/keep-balance/collection.go [new file with mode: 0644]
services/keep-balance/integration_test.go [new file with mode: 0644]
services/keep-balance/keep_service.go [new file with mode: 0644]
services/keep-balance/main.go [new file with mode: 0644]
services/keep-balance/main_test.go [new file with mode: 0644]
services/keep-balance/time_me.go [new file with mode: 0644]
services/keep-balance/usage.go [new file with mode: 0644]
services/keepstore/keepstore.go

index 9216f8264bc7eef940fe6a3a1a3dcf147def239f..3482886112db0651b4bfa972f6e24db9ddd2d76d 100644 (file)
@@ -20,10 +20,12 @@ ubuntu1404/generated: common-generated-all
        test -d ubuntu1404/generated || mkdir ubuntu1404/generated
        cp -rlt ubuntu1404/generated common-generated/*
 
-common-generated-all: common-generated/golang-amd64.tar.gz
+GOTARBALL=go1.6.2.linux-amd64.tar.gz
 
-common-generated/golang-amd64.tar.gz: common-generated
-       wget -cqO common-generated/golang-amd64.tar.gz http://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz
+common-generated-all: common-generated/$(GOTARBALL)
+
+common-generated/$(GOTARBALL): common-generated
+       wget -cqO common-generated/$(GOTARBALL) http://storage.googleapis.com/golang/$(GOTARBALL)
 
 common-generated:
        mkdir common-generated
index c21c0918221f8522a35178279f85bc2666e8ef8f..570dde162c466d3dbdf105316af242e0f00f3710 100644 (file)
@@ -5,7 +5,7 @@ MAINTAINER Brett Smith <brett@curoverse.com>
 RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel
 
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 
 # Install RVM
index ccc444c31481d61b098d7c4ed9bbf597dbf25051..ddad5426046f19de1e1c7821b8eb8031184df7e5 100644 (file)
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 
 ENV WORKSPACE /arvados
index e32cfb44a2f8da0bb4c2e42ba4cd507ea18a3b4e..80f06a224bb4516ce483b39a39829916985014f7 100644 (file)
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 
 ENV WORKSPACE /arvados
index 34bf6985b0b1c3174b0a4df3605b807a6ef96d08..2f628b0d1f91db8a14fa15bc99e6f7cb9eb30756 100644 (file)
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 
 ENV WORKSPACE /arvados
index 53771640ee7cdb1840d8b065c7e0daa7e62d5d0d..b9c003ac796613c631dcb070a6fc84a6257e7228 100644 (file)
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 
 ENV WORKSPACE /arvados
index 322129e057b3238229fa28b77a4631bca11f999f..6fdffd09da49a36ad608dd533174f133fb61424e 100755 (executable)
@@ -128,9 +128,10 @@ if test -z "$packages" ; then
         arvados-src
         arvados-workbench
         crunchstat
+        keep-balance
+        keep-block-check
         keepproxy
         keep-rsync
-        keep-block-check
         keepstore
         keep-web
         libarvados-perl"
index a51198f9ef5c657fbcc8c199097080c1bd875004..7631718f7edc00cdf7e7954e8d1972acbc07b39b 100755 (executable)
@@ -384,6 +384,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keepproxy keepproxy \
     "Make a Keep cluster accessible to clients that are not on the LAN"
+package_go_binary services/keep-balance keep-balance \
+    "Rebalance and garbage-collect data blocks stored in Arvados Keep"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
 package_go_binary services/datamanager arvados-data-manager \
index d22934199f651840a65ed03255b6e4d03332d181..30a80f527afabcee038350a963a077268b02aaa2 100755 (executable)
@@ -70,6 +70,7 @@ services/fuse
 services/keep-web
 services/keepproxy
 services/keepstore
+services/keep-balance
 services/login-sync
 services/nodemanager
 services/crunch-run
@@ -79,6 +80,7 @@ sdk/cli
 sdk/pam
 sdk/python
 sdk/ruby
+sdk/go/arvados
 sdk/go/arvadosclient
 sdk/go/keepclient
 sdk/go/httpserver
@@ -150,6 +152,8 @@ sanity_checks() {
     echo -n 'go: '
     go version \
         || fatal "No go binary. See http://golang.org/doc/install"
+    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
+        || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
     echo -n 'gcc: '
     gcc --version | egrep ^gcc \
         || fatal "No gcc. Try: apt-get install build-essential"
@@ -499,7 +503,7 @@ do_test_once() {
             then
                 # "go test -check.vv giturl" doesn't work, but this
                 # does:
-                cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+                cd "$WORKSPACE/$1" && go test ${short:+-short} ${testargs[$1]}
             else
                 # The above form gets verbose even when testargs is
                 # empty, so use this form in such cases:
@@ -703,6 +707,7 @@ do_install services/api apiserver
 
 declare -a gostuff
 gostuff=(
+    sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/blockdigest
     sdk/go/httpserver
@@ -714,6 +719,7 @@ gostuff=(
     services/keep-web
     services/keepstore
     sdk/go/keepclient
+    services/keep-balance
     services/keepproxy
     services/datamanager/summary
     services/datamanager/collection
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
new file mode 100644 (file)
index 0000000..ee830c8
--- /dev/null
@@ -0,0 +1,169 @@
+package arvados
+
+import (
+       "crypto/tls"
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "net/http"
+       "net/url"
+       "os"
+)
+
+// A Client is an HTTP client with an API endpoint and a set of
+// Arvados credentials.
+//
+// It offers methods for accessing individual Arvados APIs, and
+// methods that implement common patterns like fetching multiple pages
+// of results using List APIs.
+type Client struct {
+       // HTTP client used to make requests. If nil,
+       // http.DefaultClient or InsecureHTTPClient will be used.
+       Client *http.Client
+
+       // Hostname (or host:port) of Arvados API server.
+       APIHost string
+
+       // User authentication token.
+       AuthToken string
+
+       // Accept unverified certificates. This works only if the
+       // Client field is nil: otherwise, it has no effect.
+       Insecure bool
+}
+
+// The default http.Client used by a Client with Insecure==true and
+// Client==nil.
+var InsecureHTTPClient = &http.Client{
+       Transport: &http.Transport{
+               TLSClientConfig: &tls.Config{
+                       InsecureSkipVerify: true}}}
+
+// NewClientFromEnv creates a new Client that uses the default HTTP
+// client with the API endpoint and credentials given by the
+// ARVADOS_API_* environment variables.
+func NewClientFromEnv() *Client {
+       return &Client{
+               APIHost:   os.Getenv("ARVADOS_API_HOST"),
+               AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+               Insecure:  os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+       }
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+       if c.AuthToken != "" {
+               req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+       }
+       return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+       resp, err := c.Do(req)
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       buf, err := ioutil.ReadAll(resp.Body)
+       if err != nil {
+               return err
+       }
+       if resp.StatusCode != 200 {
+               return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+       }
+       if dst == nil {
+               return nil
+       }
+       return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+       urlString := c.apiURL(path)
+       var urlValues url.Values
+       if v, ok := params.(url.Values); ok {
+               urlValues = v
+       } else if params != nil {
+               // Convert an arbitrary struct to url.Values. For
+               // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+               // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+               //
+               // TODO: Do this more efficiently, possibly using
+               // json.Decode/Encode, so the whole thing doesn't have
+               // to get encoded, decoded, and re-encoded.
+               j, err := json.Marshal(params)
+               if err != nil {
+                       return err
+               }
+               var generic map[string]interface{}
+               err = json.Unmarshal(j, &generic)
+               if err != nil {
+                       return err
+               }
+               urlValues = url.Values{}
+               for k, v := range generic {
+                       if v, ok := v.(string); ok {
+                               urlValues.Set(k, v)
+                               continue
+                       }
+                       j, err := json.Marshal(v)
+                       if err != nil {
+                               return err
+                       }
+                       urlValues.Set(k, string(j))
+               }
+       }
+       if (method == "GET" || body != nil) && urlValues != nil {
+               // FIXME: what if params don't fit in URL
+               u, err := url.Parse(urlString)
+               if err != nil {
+                       return err
+               }
+               u.RawQuery = urlValues.Encode()
+               urlString = u.String()
+       }
+       req, err := http.NewRequest(method, urlString, body)
+       if err != nil {
+               return err
+       }
+       return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+       switch {
+       case c.Client != nil:
+               return c.Client
+       case c.Insecure:
+               return InsecureHTTPClient
+       default:
+               return http.DefaultClient
+       }
+}
+
+func (c *Client) apiURL(path string) string {
+       return "https://" + c.APIHost + "/" + path
+}
+
+// DiscoveryDocument is the Arvados server's description of itself.
+type DiscoveryDocument struct {
+       DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
+       BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+}
+
+// DiscoveryDocument returns a *DiscoveryDocument. The returned object
+// should not be modified: the same object may be returned by
+// subsequent calls.
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+       var dd DiscoveryDocument
+       return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
new file mode 100644 (file)
index 0000000..2db50bf
--- /dev/null
@@ -0,0 +1,83 @@
+package arvados
+
+import (
+       "bytes"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "sync"
+       "testing"
+)
+
+type stubTransport struct {
+       Responses map[string]string
+       Requests  []http.Request
+       sync.Mutex
+}
+
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+       stub.Lock()
+       stub.Requests = append(stub.Requests, *req)
+       stub.Unlock()
+
+       resp := &http.Response{
+               Status:     "200 OK",
+               StatusCode: 200,
+               Proto:      "HTTP/1.1",
+               ProtoMajor: 1,
+               ProtoMinor: 1,
+               Request:    req,
+       }
+       str := stub.Responses[req.URL.Path]
+       if str == "" {
+               resp.Status = "404 Not Found"
+               resp.StatusCode = 404
+               str = "{}"
+       }
+       buf := bytes.NewBufferString(str)
+       resp.Body = ioutil.NopCloser(buf)
+       resp.ContentLength = int64(buf.Len())
+       return resp, nil
+}
+
+type errorTransport struct{}
+
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+       return nil, fmt.Errorf("something awful happened")
+}
+
+func TestCurrentUser(t *testing.T) {
+       t.Parallel()
+       stub := &stubTransport{
+               Responses: map[string]string{
+                       "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+               },
+       }
+       c := &Client{
+               Client: &http.Client{
+                       Transport: stub,
+               },
+               APIHost:   "zzzzz.arvadosapi.com",
+               AuthToken: "xyzzy",
+       }
+       u, err := c.CurrentUser()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+               t.Errorf("got uuid %q, expected %q", u.UUID, x)
+       }
+       if len(stub.Requests) < 1 {
+               t.Fatal("empty stub.Requests")
+       }
+       hdr := stub.Requests[len(stub.Requests)-1].Header
+       if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+               t.Errorf("got headers %+q, expected Authorization header", hdr)
+       }
+
+       c.Client.Transport = &errorTransport{}
+       u, err = c.CurrentUser()
+       if err == nil {
+               t.Errorf("got nil error, expected something awful")
+       }
+}
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
new file mode 100644 (file)
index 0000000..71f5247
--- /dev/null
@@ -0,0 +1,62 @@
+package arvados
+
+import (
+       "bufio"
+       "fmt"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+// Collection is an arvados#collection resource.
+type Collection struct {
+       UUID                   string     `json:"uuid,omitempty"`
+       ExpiresAt              *time.Time `json:"expires_at,omitempty"`
+       ManifestText           string     `json:"manifest_text,omitempty"`
+       CreatedAt              *time.Time `json:"created_at,omitempty"`
+       ModifiedAt             *time.Time `json:"modified_at,omitempty"`
+       PortableDataHash       string     `json:"portable_data_hash,omitempty"`
+       ReplicationConfirmed   *int       `json:"replication_confirmed,omitempty"`
+       ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+       ReplicationDesired     *int       `json:"replication_desired,omitempty"`
+}
+
+// SizedDigests returns the hash+size part of each data block
+// referenced by the collection.
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+       if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+               // TODO: Check more subtle forms of corruption, too
+               return nil, fmt.Errorf("manifest is missing")
+       }
+       var sds []SizedDigest
+       scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+       scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+       for scanner.Scan() {
+               line := scanner.Text()
+               tokens := strings.Split(line, " ")
+               if len(tokens) < 3 {
+                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+               }
+               for _, token := range tokens[1:] {
+                       if !manifest.LocatorPattern.MatchString(token) {
+                               // FIXME: ensure it's a file token
+                               break
+                       }
+                       // FIXME: shouldn't assume 32 char hash
+                       if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+                               token = token[:33+i]
+                       }
+                       sds = append(sds, SizedDigest(token))
+               }
+       }
+       return sds, scanner.Err()
+}
+
+// CollectionList is an arvados#collectionList resource.
+type CollectionList struct {
+       Items          []Collection `json:"items"`
+       ItemsAvailable int          `json:"items_available"`
+       Offset         int          `json:"offset"`
+       Limit          int          `json:"limit"`
+}
diff --git a/sdk/go/arvados/doc.go b/sdk/go/arvados/doc.go
new file mode 100644 (file)
index 0000000..1e8141e
--- /dev/null
@@ -0,0 +1,12 @@
+// Package arvados is a client library for Arvados.
+//
+// The API is not stable: it should be considered experimental
+// pre-release.
+//
+// The intent is to offer model types and API call functions that can
+// be generated automatically (or at least mostly automatically) from
+// a discovery document. For the time being, there is a manually
+// generated subset of those types and API calls with (approximately)
+// the right signatures, plus client/authentication support and some
+// convenience functions.
+package arvados
diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go
new file mode 100644 (file)
index 0000000..1639c58
--- /dev/null
@@ -0,0 +1,31 @@
+package arvados
+
+import (
+       "encoding/json"
+       "fmt"
+       "time"
+)
+
+// Duration is time.Duration but looks like "12s" in JSON, rather than
+// a number of nanoseconds.
+type Duration time.Duration
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+       if data[0] == '"' {
+               dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+               *d = Duration(dur)
+               return err
+       }
+       return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+}
+
+// MarshalJSON implements json.Marshaler
+func (d *Duration) MarshalJSON() ([]byte, error) {
+       return json.Marshal(d.String())
+}
+
+// String implements fmt.Stringer
+func (d Duration) String() string {
+       return time.Duration(d).String()
+}
diff --git a/sdk/go/arvados/keep_block.go b/sdk/go/arvados/keep_block.go
new file mode 100644 (file)
index 0000000..c9a7712
--- /dev/null
@@ -0,0 +1,15 @@
+package arvados
+
+import (
+       "strconv"
+       "strings"
+)
+
+// SizedDigest is a minimal Keep block locator: hash+size
+type SizedDigest string
+
+// Size returns the size of the data block, in bytes.
+func (sd SizedDigest) Size() int64 {
+       n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+       return n
+}
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
new file mode 100644 (file)
index 0000000..4af1b79
--- /dev/null
@@ -0,0 +1,123 @@
+package arvados
+
+import (
+       "bufio"
+       "fmt"
+       "net/http"
+       "strconv"
+       "strings"
+)
+
+// KeepService is an arvados#keepService record
+type KeepService struct {
+       UUID           string `json:"uuid"`
+       ServiceHost    string `json:"service_host"`
+       ServicePort    int    `json:"service_port"`
+       ServiceSSLFlag bool   `json:"service_ssl_flag"`
+       ServiceType    string `json:"service_type"`
+       ReadOnly       bool   `json:"read_only"`
+}
+
+// KeepServiceList is an arvados#keepServiceList record
+type KeepServiceList struct {
+       Items          []KeepService `json:"items"`
+       ItemsAvailable int           `json:"items_available"`
+       Offset         int           `json:"offset"`
+       Limit          int           `json:"limit"`
+}
+
+// KeepServiceIndexEntry is what a keep service's index response tells
+// us about a stored block.
+type KeepServiceIndexEntry struct {
+       SizedDigest
+       Mtime int64
+}
+
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+       params := ResourceListParams{}
+       for {
+               var page KeepServiceList
+               err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+               if err != nil {
+                       return err
+               }
+               for _, item := range page.Items {
+                       err = f(item)
+                       if err != nil {
+                               return err
+                       }
+               }
+               params.Offset = params.Offset + len(page.Items)
+               if params.Offset >= page.ItemsAvailable {
+                       return nil
+               }
+       }
+}
+
+func (s *KeepService) url(path string) string {
+       var f string
+       if s.ServiceSSLFlag {
+               f = "https://%s:%d/%s"
+       } else {
+               f = "http://%s:%d/%s"
+       }
+       return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+       return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+       url := s.url("index/" + prefix)
+       req, err := http.NewRequest("GET", url, nil)
+       if err != nil {
+               return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+       }
+       resp, err := c.Do(req)
+       if err != nil {
+               return nil, fmt.Errorf("Do(%v): %v", url, err)
+       } else if resp.StatusCode != 200 {
+               return nil, fmt.Errorf("%v: %v", url, resp.Status)
+       }
+       defer resp.Body.Close()
+
+       var entries []KeepServiceIndexEntry
+       scanner := bufio.NewScanner(resp.Body)
+       sawEOF := false
+       for scanner.Scan() {
+               if sawEOF {
+                       return nil, fmt.Errorf("Index response contained non-terminal blank line")
+               }
+               line := scanner.Text()
+               if line == "" {
+                       sawEOF = true
+                       continue
+               }
+               fields := strings.Split(line, " ")
+               if len(fields) != 2 {
+                       return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+               }
+               mtime, err := strconv.ParseInt(fields[1], 10, 64)
+               if err != nil {
+                       return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+               }
+               entries = append(entries, KeepServiceIndexEntry{
+                       SizedDigest: SizedDigest(fields[0]),
+                       Mtime:       mtime,
+               })
+       }
+       if err := scanner.Err(); err != nil {
+               return nil, fmt.Errorf("Error scanning index response: %v", err)
+       }
+       if !sawEOF {
+               return nil, fmt.Errorf("Index response had no EOF marker")
+       }
+       return entries, nil
+}
diff --git a/sdk/go/arvados/resource_list.go b/sdk/go/arvados/resource_list.go
new file mode 100644 (file)
index 0000000..e9ea268
--- /dev/null
@@ -0,0 +1,25 @@
+package arvados
+
+import "encoding/json"
+
+// ResourceListParams expresses which results are requested in a
+// list/index API.
+type ResourceListParams struct {
+       Select  []string `json:"select,omitempty"`
+       Filters []Filter `json:"filters,omitempty"`
+       Limit   *int     `json:"limit,omitempty"`
+       Offset  int      `json:"offset,omitempty"`
+       Order   string   `json:"order,omitempty"`
+}
+
+// A Filter restricts the set of records returned by a list/index API.
+type Filter struct {
+       Attr     string
+       Operator string
+       Operand  interface{}
+}
+
+// MarshalJSON encodes a Filter in the form expected by the API.
+func (f *Filter) MarshalJSON() ([]byte, error) {
+       return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
diff --git a/sdk/go/arvados/resource_list_test.go b/sdk/go/arvados/resource_list_test.go
new file mode 100644 (file)
index 0000000..b5e6e7d
--- /dev/null
@@ -0,0 +1,21 @@
+package arvados
+
+import (
+       "bytes"
+       "encoding/json"
+       "testing"
+       "time"
+)
+
+func TestMarshalFiltersWithNanoseconds(t *testing.T) {
+       t0 := time.Now()
+       t0str := t0.Format(time.RFC3339Nano)
+       buf, err := json.Marshal([]Filter{
+               {Attr: "modified_at", Operator: "=", Operand: t0}})
+       if err != nil {
+               t.Fatal(err)
+       }
+       if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) {
+               t.Errorf("Encoded as %q, expected %q", buf, expect)
+       }
+}
diff --git a/sdk/go/arvados/user.go b/sdk/go/arvados/user.go
new file mode 100644 (file)
index 0000000..684a3af
--- /dev/null
@@ -0,0 +1,17 @@
+package arvados
+
+// User is an arvados#user record
+type User struct {
+       UUID     string `json:"uuid,omitempty"`
+       IsActive bool   `json:"is_active"`
+       IsAdmin  bool   `json:"is_admin"`
+       Username string `json:"username,omitempty"`
+}
+
+// CurrentUser calls arvados.v1.users.current, and returns the User
+// record corresponding to this client's credentials.
+func (c *Client) CurrentUser() (User, error) {
+       var u User
+       err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+       return u, err
+}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644 (file)
index 0000000..7471aa6
--- /dev/null
@@ -0,0 +1,611 @@
+package main
+
+import (
+       "fmt"
+       "log"
+       "os"
+       "runtime"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// CheckConfig returns an error if anything is wrong with the given
+// config and runOptions.
+func CheckConfig(config Config, runOptions RunOptions) error {
+       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+               return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+       }
+       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+               return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+       }
+       return nil
+}
+
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+//
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
+type Balancer struct {
+       *BlockStateMap
+       KeepServices       map[string]*KeepService
+       DefaultReplication int
+       Logger             *log.Logger
+       Dumper             *log.Logger
+       MinMtime           int64
+
+       collScanned  int
+       serviceRoots map[string]string
+       errors       []error
+       mutex        sync.Mutex
+}
+
+// Run performs a balance operation using the given config and
+// runOptions. It should only be called once on a given Balancer
+// object. Typical usage:
+//
+//   err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+       bal.Dumper = runOptions.Dumper
+       bal.Logger = runOptions.Logger
+       if bal.Logger == nil {
+               bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+       }
+
+       defer timeMe(bal.Logger, "Run")()
+
+       if len(config.KeepServiceList.Items) > 0 {
+               err = bal.SetKeepServices(config.KeepServiceList)
+       } else {
+               err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+       }
+       if err != nil {
+               return
+       }
+
+       if err = bal.CheckSanityEarly(&config.Client); err != nil {
+               return
+       }
+       if runOptions.CommitTrash {
+               if err = bal.ClearTrashLists(&config.Client); err != nil {
+                       return
+               }
+       }
+       if err = bal.GetCurrentState(&config.Client); err != nil {
+               return
+       }
+       bal.ComputeChangeSets()
+       bal.PrintStatistics()
+       if err = bal.CheckSanityLate(); err != nil {
+               return
+       }
+       if runOptions.CommitPulls {
+               err = bal.CommitPulls(&config.Client)
+               if err != nil {
+                       // Skip trash if we can't pull. (Too cautious?)
+                       return
+               }
+       }
+       if runOptions.CommitTrash {
+               err = bal.CommitTrash(&config.Client)
+       }
+       return
+}
+
+// SetKeepServices sets the list of KeepServices to operate on.
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+       bal.KeepServices = make(map[string]*KeepService)
+       for _, srv := range srvList.Items {
+               bal.KeepServices[srv.UUID] = &KeepService{
+                       KeepService: srv,
+                       ChangeSet:   &ChangeSet{},
+               }
+       }
+       return nil
+}
+
+// DiscoverKeepServices sets the list of KeepServices by calling the
+// API to get a list of all services, and selecting the ones whose
+// ServiceType is in okTypes.
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+       bal.KeepServices = make(map[string]*KeepService)
+       ok := make(map[string]bool)
+       for _, t := range okTypes {
+               ok[t] = true
+       }
+       return c.EachKeepService(func(srv arvados.KeepService) error {
+               if ok[srv.ServiceType] {
+                       bal.KeepServices[srv.UUID] = &KeepService{
+                               KeepService: srv,
+                               ChangeSet:   &ChangeSet{},
+                       }
+               } else {
+                       bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+               }
+               return nil
+       })
+}
+
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+//
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+       u, err := c.CurrentUser()
+       if err != nil {
+               return fmt.Errorf("CurrentUser(): %v", err)
+       }
+       if !u.IsActive || !u.IsAdmin {
+               return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
+       }
+       for _, srv := range bal.KeepServices {
+               if srv.ServiceType == "proxy" {
+                       return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
+               }
+       }
+       return nil
+}
+
+// ClearTrashLists sends an empty trash list to each keep
+// service. Calling this before GetCurrentState avoids races.
+//
+// When a block appears in an index, we assume that replica will still
+// exist after we delete other replicas on other servers. However,
+// it's possible that a previous rebalancing operation made different
+// decisions (e.g., servers were added/removed, and rendezvous order
+// changed). In this case, the replica might already be on that
+// server's trash list, and it might be deleted before we send a
+// replacement trash list.
+//
+// We avoid this problem if we clear all trash lists before getting
+// indexes. (We also assume there is only one rebalancing process
+// running at a time.)
+func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+       for _, srv := range bal.KeepServices {
+               srv.ChangeSet = &ChangeSet{}
+       }
+       return bal.CommitTrash(c)
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+       defer timeMe(bal.Logger, "GetCurrentState")()
+       bal.BlockStateMap = NewBlockStateMap()
+
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return err
+       }
+       bal.DefaultReplication = dd.DefaultCollectionReplication
+       bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+       errs := make(chan error, 2+len(bal.KeepServices))
+       wg := sync.WaitGroup{}
+
+       // Start one goroutine for each KeepService: retrieve the
+       // index, and add the returned blocks to BlockStateMap.
+       for _, srv := range bal.KeepServices {
+               wg.Add(1)
+               go func(srv *KeepService) {
+                       defer wg.Done()
+                       bal.logf("%s: retrieve index", srv)
+                       idx, err := srv.Index(c, "")
+                       if err != nil {
+                               errs <- fmt.Errorf("%s: %v", srv, err)
+                               return
+                       }
+                       bal.logf("%s: add %d replicas to map", srv, len(idx))
+                       bal.BlockStateMap.AddReplicas(srv, idx)
+                       bal.logf("%s: done", srv)
+               }(srv)
+       }
+
+       // collQ buffers incoming collections so we can start fetching
+       // the next page without waiting for the current page to
+       // finish processing. (1000 happens to match the page size
+       // used by (*arvados.Client)EachCollection(), but it's OK if
+       // they don't match.)
+       collQ := make(chan arvados.Collection, 1000)
+
+       // Start a goroutine to process collections. (We could use a
+       // worker pool here, but even with a single worker we already
+       // process collections much faster than we can retrieve them.)
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               for coll := range collQ {
+                       err := bal.addCollection(coll)
+                       if err != nil {
+                               errs <- err
+                               for range collQ {
+                               }
+                               return
+                       }
+                       bal.collScanned++
+               }
+       }()
+
+       // Start a goroutine to retrieve all collections from the
+       // Arvados database and send them to collQ for processing.
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               err = EachCollection(c,
+                       func(coll arvados.Collection) error {
+                               collQ <- coll
+                               if len(errs) > 0 {
+                                       // some other GetCurrentState
+                                       // error happened: no point
+                                       // getting any more
+                                       // collections.
+                                       return fmt.Errorf("")
+                               }
+                               return nil
+                       }, func(done, total int) {
+                               bal.logf("collections: %d/%d", done, total)
+                       })
+               close(collQ)
+               if err != nil {
+                       errs <- err
+               }
+       }()
+
+       go func() {
+               // Send a nil error when all goroutines finish. If
+               // this is the first error sent to errs, then
+               // everything worked.
+               wg.Wait()
+               errs <- nil
+       }()
+       return <-errs
+}
+
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+       blkids, err := coll.SizedDigests()
+       if err != nil {
+               bal.mutex.Lock()
+               bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+               bal.mutex.Unlock()
+               return nil
+       }
+       repl := bal.DefaultReplication
+       if coll.ReplicationDesired != nil {
+               repl = *coll.ReplicationDesired
+       }
+       debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       bal.BlockStateMap.IncreaseDesired(repl, blkids)
+       return nil
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+//
+// It does not actually apply any of the computed changes.
+func (bal *Balancer) ComputeChangeSets() {
+       // This just calls balanceBlock() once for each block, using a
+       // pool of worker goroutines.
+       defer timeMe(bal.Logger, "ComputeChangeSets")()
+       bal.setupServiceRoots()
+
+       type balanceTask struct {
+               blkid arvados.SizedDigest
+               blk   *BlockState
+       }
+       nWorkers := 1 + runtime.NumCPU()
+       todo := make(chan balanceTask, nWorkers)
+       var wg sync.WaitGroup
+       for i := 0; i < nWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       for work := range todo {
+                               bal.balanceBlock(work.blkid, work.blk)
+                       }
+                       wg.Done()
+               }()
+       }
+       bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+               todo <- balanceTask{
+                       blkid: blkid,
+                       blk:   blk,
+               }
+       })
+       close(todo)
+       wg.Wait()
+}
+
+func (bal *Balancer) setupServiceRoots() {
+       bal.serviceRoots = make(map[string]string)
+       for _, srv := range bal.KeepServices {
+               bal.serviceRoots[srv.UUID] = srv.UUID
+       }
+}
+
+const (
+       changeStay = iota
+       changePull
+       changeTrash
+       changeNone
+)
+
+var changeName = map[int]string{
+       changeStay:  "stay",
+       changePull:  "pull",
+       changeTrash: "trash",
+       changeNone:  "none",
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+       debugf("balanceBlock: %v %+v", blkid, blk)
+       uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+       hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+       for _, repl := range blk.Replicas {
+               hasRepl[repl.UUID] = repl
+               // TODO: when multiple copies are on one server, use
+               // the oldest one that doesn't have a timestamp
+               // collision with other replicas.
+       }
+       // number of replicas already found in positions better than
+       // the position we're contemplating now.
+       reportedBestRepl := 0
+       // To be safe we assume two replicas with the same Mtime are
+       // in fact the same replica being reported more than
+       // once. len(uniqueBestRepl) is the number of distinct
+       // replicas in the best rendezvous positions we've considered
+       // so far.
+       uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+       // pulls is the number of Pull changes we have already
+       // requested. (For purposes of deciding whether to Pull to
+       // rendezvous position N, we should assume all pulls we have
+       // requested on rendezvous positions M<N will be successful.)
+       pulls := 0
+       var changes []string
+       for _, uuid := range uuids {
+               change := changeNone
+               srv := bal.KeepServices[uuid]
+               // TODO: request a Touch if Mtime is duplicated.
+               repl, ok := hasRepl[srv.UUID]
+               if ok {
+                       // This service has a replica. We should
+                       // delete it if [1] we already have enough
+                       // distinct replicas in better rendezvous
+                       // positions and [2] this replica's Mtime is
+                       // distinct from all of the better replicas'
+                       // Mtimes.
+                       if !srv.ReadOnly &&
+                               repl.Mtime < bal.MinMtime &&
+                               len(uniqueBestRepl) >= blk.Desired &&
+                               !uniqueBestRepl[repl.Mtime] {
+                               srv.AddTrash(Trash{
+                                       SizedDigest: blkid,
+                                       Mtime:       repl.Mtime,
+                               })
+                               change = changeTrash
+                       } else {
+                               change = changeStay
+                       }
+                       uniqueBestRepl[repl.Mtime] = true
+                       reportedBestRepl++
+               } else if pulls+reportedBestRepl < blk.Desired &&
+                       len(blk.Replicas) > 0 &&
+                       !srv.ReadOnly {
+                       // This service doesn't have a replica. We
+                       // should pull one to this server if we don't
+                       // already have enough (existing+requested)
+                       // replicas in better rendezvous positions.
+                       srv.AddPull(Pull{
+                               SizedDigest: blkid,
+                               Source:      blk.Replicas[0].KeepService,
+                       })
+                       pulls++
+                       change = changePull
+               }
+               if bal.Dumper != nil {
+                       changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+               }
+       }
+       if bal.Dumper != nil {
+               bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
+       }
+}
+
+type blocksNBytes struct {
+       replicas int
+       blocks   int
+       bytes    int64
+}
+
+func (bb blocksNBytes) String() string {
+       return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+type balancerStats struct {
+       lost, overrep, unref, garbage, underrep, justright blocksNBytes
+       desired, current                                   blocksNBytes
+       pulls, trashes                                     int
+}
+
+func (bal *Balancer) getStatistics() (s balancerStats) {
+       bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+               surplus := len(blk.Replicas) - blk.Desired
+               bytes := blkid.Size()
+               switch {
+               case len(blk.Replicas) == 0 && blk.Desired > 0:
+                       s.lost.replicas -= surplus
+                       s.lost.blocks++
+                       s.lost.bytes += bytes * int64(-surplus)
+               case len(blk.Replicas) < blk.Desired:
+                       s.underrep.replicas -= surplus
+                       s.underrep.blocks++
+                       s.underrep.bytes += bytes * int64(-surplus)
+               case len(blk.Replicas) > 0 && blk.Desired == 0:
+                       counter := &s.garbage
+                       for _, r := range blk.Replicas {
+                               if r.Mtime >= bal.MinMtime {
+                                       counter = &s.unref
+                                       break
+                               }
+                       }
+                       counter.replicas += surplus
+                       counter.blocks++
+                       counter.bytes += bytes * int64(surplus)
+               case len(blk.Replicas) > blk.Desired:
+                       s.overrep.replicas += surplus
+                       s.overrep.blocks++
+                       s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+               default:
+                       s.justright.replicas += blk.Desired
+                       s.justright.blocks++
+                       s.justright.bytes += bytes * int64(blk.Desired)
+               }
+
+               if blk.Desired > 0 {
+                       s.desired.replicas += blk.Desired
+                       s.desired.blocks++
+                       s.desired.bytes += bytes * int64(blk.Desired)
+               }
+               if len(blk.Replicas) > 0 {
+                       s.current.replicas += len(blk.Replicas)
+                       s.current.blocks++
+                       s.current.bytes += bytes * int64(len(blk.Replicas))
+               }
+       })
+       for _, srv := range bal.KeepServices {
+               s.pulls += len(srv.ChangeSet.Pulls)
+               s.trashes += len(srv.ChangeSet.Trashes)
+       }
+       return
+}
+
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
+func (bal *Balancer) PrintStatistics() {
+       s := bal.getStatistics()
+       bal.logf("===")
+       bal.logf("%s lost (0=have<want)", s.lost)
+       bal.logf("%s underreplicated (0<have<want)", s.underrep)
+       bal.logf("%s just right (have=want)", s.justright)
+       bal.logf("%s overreplicated (have>want>0)", s.overrep)
+       bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+       bal.logf("%s garbage (have>want=0, old)", s.garbage)
+       bal.logf("===")
+       bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+       bal.logf("%s total usage", s.current)
+       bal.logf("===")
+       for _, srv := range bal.KeepServices {
+               bal.logf("%s: %v\n", srv, srv.ChangeSet)
+       }
+       bal.logf("===")
+}
+
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+       if bal.errors != nil {
+               for _, err := range bal.errors {
+                       bal.logf("deferred error: %v", err)
+               }
+               return fmt.Errorf("cannot proceed safely after deferred errors")
+       }
+
+       if bal.collScanned == 0 {
+               return fmt.Errorf("received zero collections")
+       }
+
+       anyDesired := false
+       bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+               if blk.Desired > 0 {
+                       anyDesired = true
+               }
+       })
+       if !anyDesired {
+               return fmt.Errorf("zero blocks have desired replication>0")
+       }
+
+       if dr := bal.DefaultReplication; dr < 1 {
+               return fmt.Errorf("Default replication (%d) is less than 1", dr)
+       }
+
+       // TODO: no two services have identical indexes
+       // TODO: no collisions (same md5, different size)
+       return nil
+}
+
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+       return bal.commitAsync(c, "send pull list",
+               func(srv *KeepService) error {
+                       return srv.CommitPulls(c)
+               })
+}
+
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+       return bal.commitAsync(c, "send trash list",
+               func(srv *KeepService) error {
+                       return srv.CommitTrash(c)
+               })
+}
+
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
+       errs := make(chan error)
+       for _, srv := range bal.KeepServices {
+               go func(srv *KeepService) {
+                       var err error
+                       defer func() { errs <- err }()
+                       label := fmt.Sprintf("%s: %v", srv, label)
+                       defer timeMe(bal.Logger, label)()
+                       err = f(srv)
+                       if err != nil {
+                               err = fmt.Errorf("%s: %v", label, err)
+                       }
+               }(srv)
+       }
+       var lastErr error
+       for _ = range bal.KeepServices {
+               if err := <-errs; err != nil {
+                       bal.logf("%v", err)
+                       lastErr = err
+               }
+       }
+       close(errs)
+       return lastErr
+}
+
+func (bal *Balancer) logf(f string, args ...interface{}) {
+       if bal.Logger != nil {
+               bal.Logger.Printf(f, args...)
+       }
+}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
new file mode 100644 (file)
index 0000000..a138d91
--- /dev/null
@@ -0,0 +1,374 @@
+package main
+
+import (
+       _ "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&runSuite{})
+
+type reqTracker struct {
+       reqs []http.Request
+       sync.Mutex
+}
+
+func (rt *reqTracker) Count() int {
+       rt.Lock()
+       defer rt.Unlock()
+       return len(rt.reqs)
+}
+
+func (rt *reqTracker) Add(req *http.Request) int {
+       rt.Lock()
+       defer rt.Unlock()
+       rt.reqs = append(rt.reqs, *req)
+       return len(rt.reqs)
+}
+
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
+type stubServer struct {
+       mux      *http.ServeMux
+       srv      *httptest.Server
+       mutex    sync.Mutex
+       Requests reqTracker
+       logf     func(string, ...interface{})
+}
+
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+//
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
+       // Set up a config.Client that forwards all requests to s.mux
+       // via s.srv. Test cases will attach handlers to s.mux to get
+       // the desired responses.
+       s.mux = http.NewServeMux()
+       s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               s.mutex.Lock()
+               s.Requests.Add(r)
+               s.mutex.Unlock()
+               w.Header().Set("Content-Type", "application/json")
+               s.mux.ServeHTTP(w, r)
+       }))
+       return &http.Client{Transport: s}
+}
+
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+       w := httptest.NewRecorder()
+       s.mux.ServeHTTP(w, req)
+       return &http.Response{
+               StatusCode: w.Code,
+               Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+               Header:     w.HeaderMap,
+               Body:       ioutil.NopCloser(w.Body)}, nil
+}
+
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
+       s.srv.Close()
+}
+
+func (s *stubServer) serveStatic(path, data string) *reqTracker {
+       rt := &reqTracker{}
+       s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+               rt.Add(r)
+               if r.Body != nil {
+                       ioutil.ReadAll(r.Body)
+                       r.Body.Close()
+               }
+               io.WriteString(w, data)
+       })
+       return rt
+}
+
+func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
+       return s.serveStatic("/arvados/v1/users/current",
+               `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+}
+
+func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
+       return s.serveStatic("/arvados/v1/users/current",
+               `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+}
+
+func (s *stubServer) serveDiscoveryDoc() *reqTracker {
+       return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+               `{"defaultCollectionReplication":2}`)
+}
+
+func (s *stubServer) serveZeroCollections() *reqTracker {
+       return s.serveStatic("/arvados/v1/collections",
+               `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFooBarFileCollections() *reqTracker {
+       rt := &reqTracker{}
+       s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+               r.ParseForm()
+               rt.Add(r)
+               if strings.Contains(r.Form.Get("filters"), `modified_at`) {
+                       io.WriteString(w, `{"items_available":0,"items":[]}`)
+               } else {
+                       io.WriteString(w, `{"items_available":2,"items":[
+                               {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+                               {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+               }
+       })
+       return rt
+}
+
+func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
+       rt := &reqTracker{}
+       s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+               r.ParseForm()
+               rt.Add(r)
+               if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
+                       io.WriteString(w, `{"items_available":3,"items":[]}`)
+               } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+                       io.WriteString(w, `{"items_available":0,"items":[]}`)
+               } else {
+                       io.WriteString(w, `{"items_available":2,"items":[
+                               {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+                               {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+               }
+       })
+       return rt
+}
+
+func (s *stubServer) serveZeroKeepServices() *reqTracker {
+       return s.serveStatic("/arvados/v1/keep_services",
+               `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
+       return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
+               {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+               {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+               {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+               {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+               {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+}
+
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+       rt := &reqTracker{}
+       s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+               count := rt.Add(r)
+               if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+                       io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+               }
+               fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+       })
+       return rt
+}
+
+func (s *stubServer) serveKeepstoreTrash() *reqTracker {
+       return s.serveStatic("/trash", `{}`)
+}
+
+func (s *stubServer) serveKeepstorePull() *reqTracker {
+       return s.serveStatic("/pull", `{}`)
+}
+
+type runSuite struct {
+       stub   stubServer
+       config Config
+}
+
+// make a log.Logger that writes to the current test's c.Log().
+func (s *runSuite) logger(c *check.C) *log.Logger {
+       r, w := io.Pipe()
+       go func() {
+               buf := make([]byte, 10000)
+               for {
+                       n, err := r.Read(buf)
+                       if n > 0 {
+                               if buf[n-1] == '\n' {
+                                       n--
+                               }
+                               c.Log(string(buf[:n]))
+                       }
+                       if err != nil {
+                               break
+                       }
+               }
+       }()
+       return log.New(w, "", log.LstdFlags)
+}
+
+func (s *runSuite) SetUpTest(c *check.C) {
+       s.config = Config{
+               Client: arvados.Client{
+                       AuthToken: "xyzzy",
+                       APIHost:   "zzzzz.arvadosapi.com",
+                       Client:    s.stub.Start()},
+               KeepServiceTypes: []string{"disk"}}
+       s.stub.serveDiscoveryDoc()
+       s.stub.logf = c.Logf
+}
+
+func (s *runSuite) TearDownTest(c *check.C) {
+       s.stub.Close()
+}
+
+func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveZeroCollections()
+       s.stub.serveFourDiskKeepServices()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+       err := (&Balancer{}).Run(s.config, opts)
+       c.Check(err, check.ErrorMatches, "received zero collections")
+       c.Check(trashReqs.Count(), check.Equals, 4)
+       c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestServiceTypes(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+       }
+       s.config.KeepServiceTypes = []string{"unlisted-type"}
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveFooBarFileCollections()
+       s.stub.serveFourDiskKeepServices()
+       indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       err := (&Balancer{}).Run(s.config, opts)
+       c.Check(err, check.IsNil)
+       c.Check(indexReqs.Count(), check.Equals, 0)
+       c.Check(trashReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+       }
+       s.stub.serveCurrentUserNotAdmin()
+       s.stub.serveZeroCollections()
+       s.stub.serveFourDiskKeepServices()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+       err := (&Balancer{}).Run(s.config, opts)
+       c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
+       c.Check(trashReqs.Count(), check.Equals, 0)
+       c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveCollectionsButSkipOne()
+       s.stub.serveFourDiskKeepServices()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+       err := (&Balancer{}).Run(s.config, opts)
+       c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
+       c.Check(trashReqs.Count(), check.Equals, 4)
+       c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDryRun(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: false,
+               CommitTrash: false,
+               Logger:      s.logger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveFooBarFileCollections()
+       s.stub.serveFourDiskKeepServices()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+       var bal Balancer
+       err := bal.Run(s.config, opts)
+       c.Check(err, check.IsNil)
+       c.Check(trashReqs.Count(), check.Equals, 0)
+       c.Check(pullReqs.Count(), check.Equals, 0)
+       stats := bal.getStatistics()
+       c.Check(stats.pulls, check.Not(check.Equals), 0)
+       c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
+       c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
+}
+
+func (s *runSuite) TestCommit(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+               Dumper:      s.logger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveFooBarFileCollections()
+       s.stub.serveFourDiskKeepServices()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+       var bal Balancer
+       err := bal.Run(s.config, opts)
+       c.Check(err, check.IsNil)
+       c.Check(trashReqs.Count(), check.Equals, 8)
+       c.Check(pullReqs.Count(), check.Equals, 4)
+       stats := bal.getStatistics()
+       // "foo" block is overreplicated by 2
+       c.Check(stats.trashes, check.Equals, 2)
+       // "bar" block is underreplicated by 1, and its only copy is
+       // in a poor rendezvous position
+       c.Check(stats.pulls, check.Equals, 2)
+}
+
+func (s *runSuite) TestRunForever(c *check.C) {
+       opts := RunOptions{
+               CommitPulls: true,
+               CommitTrash: true,
+               Logger:      s.logger(c),
+               Dumper:      s.logger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveFooBarFileCollections()
+       s.stub.serveFourDiskKeepServices()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+
+       stop := make(chan interface{})
+       s.config.RunPeriod = arvados.Duration(time.Millisecond)
+       go RunForever(s.config, opts, stop)
+
+       // Each run should send 4 clear trash lists + 4 pull lists + 4
+       // trash lists. We should complete four runs in much less than
+       // a second.
+       for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
+               time.Sleep(time.Millisecond)
+       }
+       stop <- true
+       c.Check(pullReqs.Count() >= 16, check.Equals, true)
+       c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+}
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
new file mode 100644 (file)
index 0000000..682a5fb
--- /dev/null
@@ -0,0 +1,255 @@
+package main
+
+import (
+       "crypto/md5"
+       "fmt"
+       "sort"
+       "strconv"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+       check "gopkg.in/check.v1"
+)
+
+// Test with Gocheck
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&balancerSuite{})
+
+type balancerSuite struct {
+       Balancer
+       srvs            []*KeepService
+       blks            map[string]tester
+       knownRendezvous [][]int
+       signatureTTL    int64
+}
+
+const (
+       // index into knownRendezvous
+       known0 = 0
+)
+
+type slots []int
+
+type tester struct {
+       known       int
+       desired     int
+       current     slots
+       timestamps  []int64
+       shouldPull  slots
+       shouldTrash slots
+}
+
+func (bal *balancerSuite) SetUpSuite(c *check.C) {
+       bal.knownRendezvous = nil
+       for _, str := range []string{
+               "3eab2d5fc9681074",
+               "097dba52e648f1c3",
+               "c5b4e023f8a7d691",
+               "9d81c02e76a3bf54",
+       } {
+               var slots []int
+               for _, c := range []byte(str) {
+                       pos, _ := strconv.ParseUint(string(c), 16, 4)
+                       slots = append(slots, int(pos))
+               }
+               bal.knownRendezvous = append(bal.knownRendezvous, slots)
+       }
+
+       bal.signatureTTL = 3600
+}
+
+func (bal *balancerSuite) SetUpTest(c *check.C) {
+       bal.srvs = make([]*KeepService, 16)
+       bal.KeepServices = make(map[string]*KeepService)
+       for i := range bal.srvs {
+               srv := &KeepService{
+                       KeepService: arvados.KeepService{
+                               UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+                       },
+               }
+               bal.srvs[i] = srv
+               bal.KeepServices[srv.UUID] = srv
+       }
+
+       bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+}
+
+func (bal *balancerSuite) TestPerfect(c *check.C) {
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{0, 1},
+               shouldPull:  nil,
+               shouldTrash: nil})
+}
+
+func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{0, 2, 1},
+               shouldTrash: slots{2}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+       bal.try(c, tester{
+               desired:     0,
+               current:     slots{0, 1, 3},
+               shouldTrash: slots{0, 1, 3}})
+}
+
+func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
+       bal.try(c, tester{
+               desired:    4,
+               current:    slots{0, 1},
+               shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+       bal.srvList(0, slots{3})[0].ReadOnly = true
+       bal.try(c, tester{
+               desired:    4,
+               current:    slots{0, 1},
+               shouldPull: slots{2, 4}})
+}
+
+func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
+       bal.try(c, tester{
+               desired:    2,
+               current:    slots{2, 0},
+               shouldPull: slots{1}})
+       bal.try(c, tester{
+               desired:    2,
+               current:    slots{2, 7},
+               shouldPull: slots{0, 1}})
+       // if only one of the pulls succeeds, we'll see this next:
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{2, 1, 7},
+               shouldPull:  slots{0},
+               shouldTrash: slots{7}})
+       // if both pulls succeed, we'll see this next:
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{2, 0, 1, 7},
+               shouldTrash: slots{2, 7}})
+
+       // unbalanced + excessive replication => pull + trash
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{2, 5, 7},
+               shouldPull:  slots{0, 1},
+               shouldTrash: slots{7}})
+}
+
+func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
+       // For purposes of increasing replication, we assume identical
+       // replicas are distinct.
+       bal.try(c, tester{
+               desired:    4,
+               current:    slots{0, 1},
+               timestamps: []int64{12345678, 12345678},
+               shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+       // For purposes of decreasing replication, we assume identical
+       // replicas are NOT distinct.
+       bal.try(c, tester{
+               desired:    2,
+               current:    slots{0, 1, 2},
+               timestamps: []int64{12345678, 12345678, 12345678}})
+       bal.try(c, tester{
+               desired:    2,
+               current:    slots{0, 1, 2},
+               timestamps: []int64{12345678, 10000000, 10000000}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+       oldTime := bal.MinMtime - 3600
+       newTime := bal.MinMtime + 3600
+       // The excess replica is too new to delete.
+       bal.try(c, tester{
+               desired:    2,
+               current:    slots{0, 1, 2},
+               timestamps: []int64{oldTime, newTime, newTime + 1}})
+       // The best replicas are too new to delete, but the excess
+       // replica is old enough.
+       bal.try(c, tester{
+               desired:     2,
+               current:     slots{0, 1, 2},
+               timestamps:  []int64{newTime, newTime + 1, oldTime},
+               shouldTrash: slots{2}})
+}
+
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
+func (bal *balancerSuite) try(c *check.C, t tester) {
+       bal.setupServiceRoots()
+       blk := &BlockState{
+               Desired:  t.desired,
+               Replicas: bal.replList(t.known, t.current)}
+       for i, t := range t.timestamps {
+               blk.Replicas[i].Mtime = t
+       }
+       for _, srv := range bal.srvs {
+               srv.ChangeSet = &ChangeSet{}
+       }
+       bal.balanceBlock(knownBlkid(t.known), blk)
+
+       var didPull, didTrash slots
+       for i, srv := range bal.srvs {
+               var slot int
+               for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+                       if srvNum == i {
+                               slot = probeOrder
+                       }
+               }
+               for _, pull := range srv.Pulls {
+                       didPull = append(didPull, slot)
+                       c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
+               }
+               for _, trash := range srv.Trashes {
+                       didTrash = append(didTrash, slot)
+                       c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
+               }
+       }
+
+       for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+               sort.Sort(sort.IntSlice(list))
+       }
+       c.Check(didPull, check.DeepEquals, t.shouldPull)
+       c.Check(didTrash, check.DeepEquals, t.shouldTrash)
+}
+
+// srvList returns the KeepServices, sorted in rendezvous order and
+// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
+// the first-, second-, and fifth-best servers for storing
+// bal.knownBlkid(3).
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+       for _, i := range order {
+               srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
+       }
+       return
+}
+
+// replList is like srvList but returns an "existing replicas" slice,
+// suitable for a BlockState test fixture.
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+       mtime := time.Now().Unix() - bal.signatureTTL - 86400
+       for _, srv := range bal.srvList(knownBlockID, order) {
+               repls = append(repls, Replica{srv, mtime})
+               mtime++
+       }
+       return
+}
+
+// generate the same data hashes that are tested in
+// sdk/go/keepclient/root_sorter_test.go
+func knownBlkid(i int) arvados.SizedDigest {
+       return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644 (file)
index 0000000..d607386
--- /dev/null
@@ -0,0 +1,95 @@
+package main
+
+import (
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Replica is a file on disk (or object in an S3 bucket, or blob in an
+// Azure storage container, etc.) as reported in a keepstore index
+// response.
+type Replica struct {
+       *KeepService
+       Mtime int64
+}
+
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas actually stored
+// (according to the keepstore indexes we know about).
+type BlockState struct {
+       Replicas []Replica
+       Desired  int
+}
+
+func (bs *BlockState) addReplica(r Replica) {
+       bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) increaseDesired(n int) {
+       if bs.Desired < n {
+               bs.Desired = n
+       }
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+       entries map[arvados.SizedDigest]*BlockState
+       mutex   sync.Mutex
+}
+
+// NewBlockStateMap returns a newly allocated BlockStateMap.
+func NewBlockStateMap() *BlockStateMap {
+       return &BlockStateMap{
+               entries: make(map[arvados.SizedDigest]*BlockState),
+       }
+}
+
+// return a BlockState entry, allocating a new one if needed. (Private
+// method: not goroutine-safe.)
+func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState {
+       // TODO? Allocate BlockState structs a slice at a time,
+       // instead of one at a time.
+       blk := bsm.entries[blkid]
+       if blk == nil {
+               blk = &BlockState{}
+               bsm.entries[blkid] = blk
+       }
+       return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+       bsm.mutex.Lock()
+       defer bsm.mutex.Unlock()
+
+       for blkid, blk := range bsm.entries {
+               f(blkid, blk)
+       }
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+       bsm.mutex.Lock()
+       defer bsm.mutex.Unlock()
+
+       for _, ent := range idx {
+               bsm.get(ent.SizedDigest).addReplica(Replica{
+                       KeepService: srv,
+                       Mtime:       ent.Mtime,
+               })
+       }
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+       bsm.mutex.Lock()
+       defer bsm.mutex.Unlock()
+
+       for _, blkid := range blocks {
+               bsm.get(blkid).increaseDesired(n)
+       }
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644 (file)
index 0000000..417ea7f
--- /dev/null
@@ -0,0 +1,75 @@
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+       arvados.SizedDigest
+       Source *KeepService
+}
+
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+       type KeepstorePullRequest struct {
+               Locator string   `json:"locator"`
+               Servers []string `json:"servers"`
+       }
+       return json.Marshal(KeepstorePullRequest{
+               Locator: string(p.SizedDigest[:32]),
+               Servers: []string{p.Source.URLBase()}})
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+       arvados.SizedDigest
+       Mtime int64
+}
+
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+       type KeepstoreTrashRequest struct {
+               Locator    string `json:"locator"`
+               BlockMtime int64  `json:"block_mtime"`
+       }
+       return json.Marshal(KeepstoreTrashRequest{
+               Locator:    string(t.SizedDigest[:32]),
+               BlockMtime: t.Mtime})
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+       Pulls   []Pull
+       Trashes []Trash
+       mutex   sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+       cs.mutex.Lock()
+       cs.Pulls = append(cs.Pulls, p)
+       cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+       cs.mutex.Lock()
+       cs.Trashes = append(cs.Trashes, t)
+       cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs *ChangeSet) String() string {
+       cs.mutex.Lock()
+       defer cs.mutex.Unlock()
+       return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644 (file)
index 0000000..b5dcb5c
--- /dev/null
@@ -0,0 +1,35 @@
+package main
+
+import (
+       "encoding/json"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&changeSetSuite{})
+
+type changeSetSuite struct{}
+
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+       srv := &KeepService{
+               KeepService: arvados.KeepService{
+                       UUID:           "zzzzz-bi6l4-000000000000001",
+                       ServiceType:    "disk",
+                       ServiceSSLFlag: false,
+                       ServiceHost:    "keep1.zzzzz.arvadosapi.com",
+                       ServicePort:    25107}}
+
+       buf, err := json.Marshal([]Pull{{
+               SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+               Source:      srv}})
+       c.Check(err, check.IsNil)
+       c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+
+       buf, err = json.Marshal([]Trash{{
+               SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+               Mtime:       123456789}})
+       c.Check(err, check.IsNil)
+       c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644 (file)
index 0000000..e6a1f08
--- /dev/null
@@ -0,0 +1,95 @@
+package main
+
+import (
+       "fmt"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
+       var page arvados.CollectionList
+       var zero int
+       params.Limit = &zero
+       err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+       return page.ItemsAvailable, err
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+       if progress == nil {
+               progress = func(_, _ int) {}
+       }
+
+       expectCount, err := countCollections(c, arvados.ResourceListParams{})
+       if err != nil {
+               return err
+       }
+
+       limit := 1000
+       params := arvados.ResourceListParams{
+               Limit:  &limit,
+               Order:  "modified_at, uuid",
+               Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+       }
+       var last arvados.Collection
+       var filterTime time.Time
+       callCount := 0
+       for {
+               progress(callCount, expectCount)
+               var page arvados.CollectionList
+               err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+               if err != nil {
+                       return err
+               }
+               for _, coll := range page.Items {
+                       if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+                               continue
+                       }
+                       callCount++
+                       err = f(coll)
+                       if err != nil {
+                               return err
+                       }
+                       last = coll
+               }
+               if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
+                       if page.ItemsAvailable > len(page.Items) {
+                               // TODO: use "mtime=X && UUID>Y"
+                               // filters to get all collections with
+                               // this timestamp, then use "mtime>X"
+                               // to get the next timestamp.
+                               return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+                       }
+                       break
+               }
+               filterTime = *last.ModifiedAt
+               params.Filters = []arvados.Filter{{
+                       Attr:     "modified_at",
+                       Operator: ">=",
+                       Operand:  filterTime,
+               }, {
+                       Attr:     "uuid",
+                       Operator: "!=",
+                       Operand:  last.UUID,
+               }}
+       }
+       progress(callCount, expectCount)
+
+       if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{
+               Attr:     "modified_at",
+               Operator: "<=",
+               Operand:  filterTime}}}); err != nil {
+               return err
+       } else if callCount < checkCount {
+               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+       }
+
+       return nil
+}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644 (file)
index 0000000..b090614
--- /dev/null
@@ -0,0 +1,92 @@
+package main
+
+import (
+       "bytes"
+       "log"
+       "net/http"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&integrationSuite{})
+
+type integrationSuite struct {
+       config     Config
+       keepClient *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+       if testing.Short() {
+               c.Skip("-short")
+       }
+       arvadostest.ResetEnv()
+       arvadostest.StartAPI()
+       arvadostest.StartKeep(4, true)
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       arv.ApiToken = arvadostest.DataManagerToken
+       c.Assert(err, check.IsNil)
+       s.keepClient = &keepclient.KeepClient{
+               Arvados: &arv,
+               Client:  &http.Client{},
+       }
+       c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+       s.putReplicas(c, "foo", 4)
+       s.putReplicas(c, "bar", 1)
+}
+
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+       s.keepClient.Want_replicas = replicas
+       _, _, err := s.keepClient.PutB([]byte(data))
+       c.Assert(err, check.IsNil)
+}
+
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+       if testing.Short() {
+               c.Skip("-short")
+       }
+       arvadostest.StopKeep(4)
+       arvadostest.StopAPI()
+}
+
+func (s *integrationSuite) SetUpTest(c *check.C) {
+       s.config = Config{
+               Client: arvados.Client{
+                       APIHost:   os.Getenv("ARVADOS_API_HOST"),
+                       AuthToken: arvadostest.DataManagerToken,
+                       Insecure:  true,
+               },
+               KeepServiceTypes: []string{"disk"},
+       }
+}
+
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+       var logBuf *bytes.Buffer
+       for iter := 0; iter < 20; iter++ {
+               logBuf := &bytes.Buffer{}
+               opts := RunOptions{
+                       CommitPulls: true,
+                       CommitTrash: true,
+                       Logger:      log.New(logBuf, "", log.LstdFlags),
+               }
+               err := (&Balancer{}).Run(s.config, opts)
+               c.Check(err, check.IsNil)
+               if iter == 0 {
+                       c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+                       c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+               } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+                       break
+               }
+               time.Sleep(200 * time.Millisecond)
+       }
+       c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644 (file)
index 0000000..f65355d
--- /dev/null
@@ -0,0 +1,76 @@
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "net/http"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// KeepService represents a keepstore server that is being rebalanced.
+type KeepService struct {
+       arvados.KeepService
+       *ChangeSet
+}
+
+// String implements fmt.Stringer.
+func (srv *KeepService) String() string {
+       return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+}
+
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+
+// URLBase returns scheme://host:port for this server.
+func (srv *KeepService) URLBase() string {
+       return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+}
+
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+       return srv.put(c, "pull", srv.ChangeSet.Pulls)
+}
+
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+       return srv.put(c, "trash", srv.ChangeSet.Trashes)
+}
+
+// Perform a PUT request at path, with data (as JSON) in the request
+// body.
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+       // We'll start a goroutine to do the JSON encoding, so we can
+       // stream it to the http client through a Pipe, rather than
+       // keeping the entire encoded version in memory.
+       jsonR, jsonW := io.Pipe()
+
+       // errC communicates any encoding errors back to our main
+       // goroutine.
+       errC := make(chan error, 1)
+
+       go func() {
+               enc := json.NewEncoder(jsonW)
+               errC <- enc.Encode(data)
+               jsonW.Close()
+       }()
+
+       url := srv.URLBase() + "/" + path
+       req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+       if err != nil {
+               return fmt.Errorf("building request for %s: %v", url, err)
+       }
+       err = c.DoAndDecode(nil, req)
+
+       // If there was an error encoding the request body, report
+       // that instead of the response: obviously we won't get a
+       // useful response if our request wasn't properly encoded.
+       if encErr := <-errC; encErr != nil {
+               return fmt.Errorf("encoding data for %s: %v", url, encErr)
+       }
+
+       return err
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644 (file)
index 0000000..42a8d63
--- /dev/null
@@ -0,0 +1,156 @@
+package main
+
+import (
+       "encoding/json"
+       "flag"
+       "io/ioutil"
+       "log"
+       "os"
+       "os/signal"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+//
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+       // Arvados API endpoint and credentials.
+       Client arvados.Client
+
+       // List of service types (e.g., "disk") to balance.
+       KeepServiceTypes []string
+
+       KeepServiceList arvados.KeepServiceList
+
+       // How often to check
+       RunPeriod arvados.Duration
+}
+
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+//
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+       Once        bool
+       CommitPulls bool
+       CommitTrash bool
+       Logger      *log.Logger
+       Dumper      *log.Logger
+}
+
+var debugf = func(string, ...interface{}) {}
+
+func main() {
+       var config Config
+       var runOptions RunOptions
+
+       configPath := flag.String("config", "",
+               "`path` of json configuration file")
+       serviceListPath := flag.String("config.KeepServiceList", "",
+               "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
+                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+       flag.BoolVar(&runOptions.Once, "once", false,
+               "balance once and then exit")
+       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+               "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+               "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
+       debugFlag := flag.Bool("debug", false, "enable debug messages")
+       flag.Usage = usage
+       flag.Parse()
+
+       if *configPath == "" {
+               log.Fatal("You must specify a config file (see `keep-balance -help`)")
+       }
+       mustReadJSON(&config, *configPath)
+       if *serviceListPath != "" {
+               mustReadJSON(&config.KeepServiceList, *serviceListPath)
+       }
+
+       if *debugFlag {
+               debugf = log.Printf
+               if j, err := json.Marshal(config); err != nil {
+                       log.Fatal(err)
+               } else {
+                       log.Printf("config is %s", j)
+               }
+       }
+       if *dumpFlag {
+               runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+       }
+       err := CheckConfig(config, runOptions)
+       if err != nil {
+               // (don't run)
+       } else if runOptions.Once {
+               err = (&Balancer{}).Run(config, runOptions)
+       } else {
+               err = RunForever(config, runOptions, nil)
+       }
+       if err != nil {
+               log.Fatal(err)
+       }
+}
+
+func mustReadJSON(dst interface{}, path string) {
+       if buf, err := ioutil.ReadFile(path); err != nil {
+               log.Fatalf("Reading %q: %v", path, err)
+       } else if err = json.Unmarshal(buf, dst); err != nil {
+               log.Fatalf("Decoding %q: %v", path, err)
+       }
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+       if runOptions.Logger == nil {
+               runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+       }
+       logger := runOptions.Logger
+
+       ticker := time.NewTicker(time.Duration(config.RunPeriod))
+
+       // The unbuffered channel here means we only hear SIGUSR1 if
+       // it arrives while we're waiting in select{}.
+       sigUSR1 := make(chan os.Signal)
+       signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+
+       for {
+               if !runOptions.CommitPulls && !runOptions.CommitTrash {
+                       logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+               }
+
+               err := (&Balancer{}).Run(config, runOptions)
+               if err != nil {
+                       logger.Print("run failed: ", err)
+               } else {
+                       logger.Print("run succeeded")
+               }
+
+               select {
+               case <-stop:
+                       signal.Stop(sigUSR1)
+                       return nil
+               case <-ticker.C:
+                       logger.Print("timer went off")
+               case <-sigUSR1:
+                       logger.Print("received SIGUSR1, resetting timer")
+                       // Reset the timer so we don't start the N+1st
+                       // run too soon after the Nth run is triggered
+                       // by SIGUSR1.
+                       ticker.Stop()
+                       ticker = time.NewTicker(time.Duration(config.RunPeriod))
+               }
+               logger.Print("starting next run")
+       }
+}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644 (file)
index 0000000..4a56098
--- /dev/null
@@ -0,0 +1,43 @@
+package main
+
+import (
+       "encoding/json"
+       "time"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&mainSuite{})
+
+type mainSuite struct{}
+
+func (s *mainSuite) TestExampleJSON(c *check.C) {
+       var config Config
+       c.Check(json.Unmarshal(exampleConfigFile, &config), check.IsNil)
+       c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
+       c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+       c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
+}
+
+func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
+       var config Config
+       c.Check(json.Unmarshal([]byte(`
+               {
+                   "Client": {
+                       "APIHost": "zzzzz.arvadosapi.com:443",
+                       "AuthToken": "xyzzy",
+                       "Insecure": false
+                   },
+                   "KeepServiceList": {
+                       "items": [
+                           {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
+                           {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
+                       ]
+                   },
+                   "RunPeriod": "600s"
+               }`), &config), check.IsNil)
+       c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
+       c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
+       c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
+       c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644 (file)
index 0000000..e5f16b7
--- /dev/null
@@ -0,0 +1,14 @@
+package main
+
+import (
+       "log"
+       "time"
+)
+
+func timeMe(logger *log.Logger, label string) func() {
+       t0 := time.Now()
+       logger.Printf("%s: start", label)
+       return func() {
+               logger.Printf("%s: took %v", label, time.Since(t0))
+       }
+}
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
new file mode 100644 (file)
index 0000000..eb9990c
--- /dev/null
@@ -0,0 +1,83 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "os"
+)
+
+var exampleConfigFile = []byte(`
+    {
+       "Client": {
+           "APIHost": "zzzzz.arvadosapi.com:443",
+           "AuthToken": "xyzzy",
+           "Insecure": false
+       },
+       "KeepServiceTypes": [
+           "disk"
+       ],
+       "RunPeriod": "600s"
+    }`)
+
+func usage() {
+       fmt.Fprintf(os.Stderr, `
+
+keep-balance rebalances a set of keepstore servers. It creates new
+copies of underreplicated blocks, deletes excess copies of
+overreplicated and unreferenced blocks, and moves blocks to better
+positions (according to the rendezvous hash algorithm) so clients find
+them faster.
+
+Usage: keep-balance -config path/to/config.json [options]
+
+Options:
+`)
+       flag.PrintDefaults()
+       fmt.Fprintf(os.Stderr, `
+Example config file:
+%s
+
+    Client.AuthToken must be recognized by Arvados as an admin token,
+    and must be recognized by all Keep services as a "data manager
+    key".
+
+    Client.Insecure should be true if your Arvados API endpoint uses
+    an unverifiable SSL/TLS certificate.
+
+Periodic scanning:
+
+    By default, keep-balance operates periodically, i.e.: do a
+    scan/balance operation, sleep, repeat.
+
+    RunPeriod determines the interval between start times of
+    successive scan/balance operations. If a scan/balance operation
+    takes longer than RunPeriod, the next one will follow it
+    immediately.
+
+    If SIGUSR1 is received during an idle period between operations,
+    the next operation will start immediately.
+
+One-time scanning:
+
+    Use the -once flag to do a single operation and then exit. The
+    exit code will be zero if the operation was successful.
+
+Committing:
+
+    By default, keep-service computes and reports changes but does not
+    implement them by sending pull and trash lists to the Keep
+    services.
+
+    Use the -commit-pull and -commit-trash flags to implement the
+    computed changes.
+
+Limitations:
+
+    keep-balance does not attempt to discover whether committed pull
+    and trash requests ever get carried out -- only that they are
+    accepted by the Keep services. If some services are full, new
+    copies of underreplicated blocks might never get made, only
+    repeatedly requested.
+
+`, exampleConfigFile)
+}
index 80d867010568090592283e14a8bff28a35b4dda4..d7da67c348d8b7f4e505ecda5010fd5ca802440d 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
@@ -331,8 +332,12 @@ func main() {
        }
 
        // Initialize Pull queue and worker
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatalf("MakeArvadosClient: %s", err)
+       }
        keepClient := &keepclient.KeepClient{
-               Arvados:       nil,
+               Arvados:       &arv,
                Want_replicas: 1,
                Client:        &http.Client{},
        }