10276: Remove data manager (superseded by keep-balance).
authorTom Clegg <tom@curoverse.com>
Fri, 14 Oct 2016 15:04:23 +0000 (11:04 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 9 Dec 2016 21:24:37 +0000 (16:24 -0500)
24 files changed:
build/run-build-packages-one-target.sh
build/run-build-packages.sh
build/run-tests.sh
sdk/go/logger/logger.go [deleted file]
sdk/go/logger/util.go [deleted file]
sdk/go/util/util.go [deleted file]
services/datamanager/collection/collection.go [deleted file]
services/datamanager/collection/collection_test.go [deleted file]
services/datamanager/collection/testing.go [deleted file]
services/datamanager/datamanager.go [deleted file]
services/datamanager/datamanager_test.go [deleted file]
services/datamanager/experimental/datamanager.py [deleted file]
services/datamanager/experimental/datamanager_test.py [deleted file]
services/datamanager/keep/keep.go [deleted file]
services/datamanager/keep/keep_test.go [deleted file]
services/datamanager/loggerutil/loggerutil.go [deleted file]
services/datamanager/summary/canonical_string.go [deleted file]
services/datamanager/summary/file.go [deleted file]
services/datamanager/summary/pull_list.go [deleted file]
services/datamanager/summary/pull_list_test.go [deleted file]
services/datamanager/summary/summary.go [deleted file]
services/datamanager/summary/summary_test.go [deleted file]
services/datamanager/summary/trash_list.go [deleted file]
services/datamanager/summary/trash_list_test.go [deleted file]

index 60250c9d50b4b07befa49903645ae00cd0b8a4d6..16c7129d9b8efc0747da64f394366680f1ce4fc6 100755 (executable)
@@ -127,7 +127,6 @@ popd
 
 if test -z "$packages" ; then
     packages="arvados-api-server
-        arvados-data-manager
         arvados-docker-cleaner
         arvados-git-httpd
         arvados-node-manager
index 26300c0fc2f2e23843b7ca9858cc4da410ddcfb4..0f10d26a9de37049c5a4ef7f1e1aed99e41db037 100755 (executable)
@@ -424,8 +424,6 @@ package_go_binary services/crunch-run crunch-run \
     "Supervise a single Crunch container"
 package_go_binary services/crunchstat crunchstat \
     "Gather cpu/memory/network statistics of running Crunch jobs"
-package_go_binary services/datamanager arvados-data-manager \
-    "Ensure block replication levels, report disk usage, and determine which blocks should be deleted when space is needed"
 package_go_binary services/keep-balance keep-balance \
     "Rebalance and garbage-collect data blocks stored in Arvados Keep"
 package_go_binary services/keepproxy keepproxy \
index d08568cde58b97bdede5b430eda4f0b788f4e3ce..560a6933e8afe63e48f963dfa8c990a2da2b9897 100755 (executable)
@@ -764,10 +764,6 @@ gostuff=(
     sdk/go/keepclient
     services/keep-balance
     services/keepproxy
-    services/datamanager/summary
-    services/datamanager/collection
-    services/datamanager/keep
-    services/datamanager
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go
deleted file mode 100644 (file)
index 6dd7fb3..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-// Logger periodically writes a log to the Arvados SDK.
-//
-// This package is useful for maintaining a log object that is updated
-// over time. This log object will be periodically written to the log,
-// as specified by WriteInterval in the Params.
-//
-// This package is safe for concurrent use as long as:
-// The maps passed to a LogMutator are not accessed outside of the
-// LogMutator
-//
-// Usage:
-// arvLogger := logger.NewLogger(params)
-// arvLogger.Update(func(properties map[string]interface{},
-//     entry map[string]interface{}) {
-//   // Modifiy properties and entry however you want
-//   // properties is a shortcut for entry["properties"].(map[string]interface{})
-//   // properties can take any (valid) values you want to give it,
-//   // entry will only take the fields listed at
-//   // http://doc.arvados.org/api/schema/Log.html
-//   // Valid values for properties are anything that can be json
-//   // encoded (i.e. will not error if you call json.Marshal() on it.
-// })
-package logger
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "log"
-       "time"
-)
-
-const (
-       startSuffix              = "-start"
-       partialSuffix            = "-partial"
-       finalSuffix              = "-final"
-       numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
-)
-
-type LoggerParams struct {
-       Client          *arvadosclient.ArvadosClient // The client we use to write log entries
-       EventTypePrefix string                       // The prefix we use for the event type in the log entry
-       WriteInterval   time.Duration                // Wait at least this long between log writes
-}
-
-// A LogMutator is a function which modifies the log entry.
-// It takes two maps as arguments, properties is the first and entry
-// is the second
-// properties is a shortcut for entry["properties"].(map[string]interface{})
-// properties can take any values you want to give it.
-// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
-// properties and entry are only safe to access inside the LogMutator,
-// they should not be stored anywhere, otherwise you'll risk
-// concurrent access.
-type LogMutator func(map[string]interface{}, map[string]interface{})
-
-// A Logger is used to build up a log entry over time and write every
-// version of it.
-type Logger struct {
-       // The data we write
-       data       map[string]interface{} // The entire map that we give to the api
-       entry      map[string]interface{} // Convenience shortcut into data
-       properties map[string]interface{} // Convenience shortcut into data
-
-       params LoggerParams // Parameters we were given
-
-       // Variables to coordinate updating and writing.
-       modified    bool            // Has this data been modified since the last write?
-       workToDo    chan LogMutator // Work to do in the worker thread.
-       writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
-       hasWritten  bool            // Whether we've written at all yet.
-       noMoreWork  chan bool       // Signals that we're done writing.
-
-       writeHooks []LogMutator // Mutators we call before each write.
-}
-
-// Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) (l *Logger, err error) {
-       // sanity check parameters
-       if &params.Client == nil {
-               err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
-               return
-       }
-       if params.EventTypePrefix == "" {
-               err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
-               return
-       }
-
-       l = &Logger{
-               data:        make(map[string]interface{}),
-               entry:       make(map[string]interface{}),
-               properties:  make(map[string]interface{}),
-               params:      params,
-               workToDo:    make(chan LogMutator, 10),
-               writeTicker: time.NewTicker(params.WriteInterval),
-               noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
-
-       l.data["log"] = l.entry
-       l.entry["properties"] = l.properties
-
-       // Start the worker goroutine.
-       go l.work()
-
-       return l, nil
-}
-
-// Exported functions will be called from other goroutines, therefore
-// all they are allowed to do is enqueue work to be done in the worker
-// goroutine.
-
-// Enqueues an update. This will happen in another goroutine after
-// this method returns.
-func (l *Logger) Update(mutator LogMutator) {
-       l.workToDo <- mutator
-}
-
-// Similar to Update(), but writes the log entry as soon as possible
-// (ignoring MinimumWriteInterval) and blocks until the entry has been
-// written. This is useful if you know that you're about to quit
-// (e.g. if you discovered a fatal error, or you're finished), since
-// go will not wait for timers (including the pending write timer) to
-// go off before exiting.
-func (l *Logger) FinalUpdate(mutator LogMutator) {
-       // TODO(misha): Consider not accepting any future updates somehow,
-       // since they won't get written if they come in after this.
-
-       // Stop the periodic write ticker. We'll perform the final write
-       // before returning from this function.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.writeTicker.Stop()
-       }
-
-       // Apply the final update
-       l.workToDo <- mutator
-
-       // Perform the final write and signal that we can return.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.write(true)
-               for i := 0; i < numberNoMoreWorkMessages; {
-                       l.noMoreWork <- true
-               }
-       }
-
-       // Wait until we've performed the write.
-       <-l.noMoreWork
-}
-
-// Adds a hook which will be called every time this logger writes an entry.
-func (l *Logger) AddWriteHook(hook LogMutator) {
-       // We do the work in a LogMutator so that it happens in the worker
-       // goroutine.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.writeHooks = append(l.writeHooks, hook)
-       }
-}
-
-// The worker loop
-func (l *Logger) work() {
-       for {
-               select {
-               case <-l.writeTicker.C:
-                       if l.modified {
-                               l.write(false)
-                               l.modified = false
-                       }
-               case mutator := <-l.workToDo:
-                       mutator(l.properties, l.entry)
-                       l.modified = true
-               case <-l.noMoreWork:
-                       return
-               }
-       }
-}
-
-// Actually writes the log entry.
-func (l *Logger) write(isFinal bool) {
-
-       // Run all our hooks
-       for _, hook := range l.writeHooks {
-               hook(l.properties, l.entry)
-       }
-
-       // Update the event type.
-       if isFinal {
-               l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
-       } else if l.hasWritten {
-               l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
-       } else {
-               l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
-       }
-       l.hasWritten = true
-
-       // Write the log entry.
-       // This is a network write and will take a while, which is bad
-       // because we're blocking all the other work on this goroutine.
-       //
-       // TODO(misha): Consider rewriting this so that we can encode l.data
-       // into a string, and then perform the actual write in another
-       // routine. This will be tricky and will require support in the
-       // client.
-       err := l.params.Client.Create("logs", l.data, nil)
-       if err != nil {
-               log.Printf("Received error writing %v: %v", l.data, err)
-       }
-}
diff --git a/sdk/go/logger/util.go b/sdk/go/logger/util.go
deleted file mode 100644 (file)
index 6425aca..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-// Helper methods for interacting with Logger.
-package logger
-
-// Retrieves the map[string]interface{} stored at parent[key] if it
-// exists, otherwise it makes it and stores it there.
-// This is useful for logger because you may not know if a map you
-// need has already been created.
-func GetOrCreateMap(
-       parent map[string]interface{},
-       key string) (child map[string]interface{}) {
-       read, exists := parent[key]
-       if exists {
-               child = read.(map[string]interface{})
-
-       } else {
-               child = make(map[string]interface{})
-               parent[key] = child
-       }
-       return
-}
diff --git a/sdk/go/util/util.go b/sdk/go/util/util.go
deleted file mode 100644 (file)
index ac510de..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/* Helper methods for dealing with responses from API Server. */
-
-package util
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-)
-
-func UserIsAdmin(arv *arvadosclient.ArvadosClient) (is_admin bool, err error) {
-       type user struct {
-               IsAdmin bool `json:"is_admin"`
-       }
-       var u user
-       err = arv.Call("GET", "users", "", "current", nil, &u)
-       return u.IsAdmin, err
-}
-
-// Returns the total count of a particular type of resource
-//
-//   resource - the arvados resource to count
-// return
-//   count - the number of items of type resource the api server reports, if no error
-//   err - error accessing the resource, or nil if no error
-func NumberItemsAvailable(client *arvadosclient.ArvadosClient, resource string) (count int, err error) {
-       var response struct {
-               ItemsAvailable int `json:"items_available"`
-       }
-       sdkParams := arvadosclient.Dict{"limit": 0}
-       err = client.List(resource, sdkParams, &response)
-       if err == nil {
-               count = response.ItemsAvailable
-       }
-       return
-}
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
deleted file mode 100644 (file)
index 05e7a5f..0000000
+++ /dev/null
@@ -1,408 +0,0 @@
-// Deals with parsing Collection responses from API Server.
-
-package collection
-
-import (
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "git.curoverse.com/arvados.git/sdk/go/util"
-       "log"
-       "os"
-       "runtime/pprof"
-       "time"
-)
-
-var (
-       HeapProfileFilename string
-)
-
-// Collection representation
-type Collection struct {
-       UUID              string
-       OwnerUUID         string
-       ReplicationLevel  int
-       BlockDigestToSize map[blockdigest.BlockDigest]int
-       TotalSize         int
-}
-
-// ReadCollections holds information about collections from API server
-type ReadCollections struct {
-       ReadAllCollections        bool
-       UUIDToCollection          map[string]Collection
-       OwnerToCollectionSize     map[string]int
-       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       CollectionUUIDToIndex     map[string]int
-       CollectionIndexToUUID     []string
-       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
-}
-
-// GetCollectionsParams params
-type GetCollectionsParams struct {
-       Client    *arvadosclient.ArvadosClient
-       Logger    *logger.Logger
-       BatchSize int
-}
-
-// SdkCollectionInfo holds collection info from api
-type SdkCollectionInfo struct {
-       UUID               string    `json:"uuid"`
-       OwnerUUID          string    `json:"owner_uuid"`
-       ReplicationDesired int       `json:"replication_desired"`
-       ModifiedAt         time.Time `json:"modified_at"`
-       ManifestText       string    `json:"manifest_text"`
-}
-
-// SdkCollectionList lists collections from api
-type SdkCollectionList struct {
-       ItemsAvailable int                 `json:"items_available"`
-       Items          []SdkCollectionInfo `json:"items"`
-}
-
-func init() {
-       flag.StringVar(&HeapProfileFilename,
-               "heap-profile",
-               "",
-               "File to write the heap profiles to. Leave blank to skip profiling.")
-}
-
-// WriteHeapProfile writes the heap profile to a file for later review.
-// Since a file is expected to only contain a single heap profile this
-// function overwrites the previously written profile, so it is safe
-// to call multiple times in a single run.
-// Otherwise we would see cumulative numbers as explained here:
-// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() error {
-       if HeapProfileFilename != "" {
-               heapProfile, err := os.Create(HeapProfileFilename)
-               if err != nil {
-                       return err
-               }
-
-               defer heapProfile.Close()
-
-               err = pprof.WriteHeapProfile(heapProfile)
-               return err
-       }
-
-       return nil
-}
-
-// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
-       results, err = GetCollections(params)
-       if err != nil {
-               return
-       }
-
-       results.Summarize(params.Logger)
-
-       log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
-       log.Printf("Read and processed %d collections",
-               len(results.UUIDToCollection))
-
-       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
-       // lots of behaviors can become warnings (and obviously we can't
-       // write anything).
-       // if !readCollections.ReadAllCollections {
-       //      log.Fatalf("Did not read all collections")
-       // }
-
-       return
-}
-
-// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
-       if &params.Client == nil {
-               err = fmt.Errorf("params.Client passed to GetCollections() should " +
-                       "contain a valid ArvadosClient, but instead it is nil.")
-               return
-       }
-
-       fieldsWanted := []string{"manifest_text",
-               "owner_uuid",
-               "uuid",
-               "replication_desired",
-               "modified_at"}
-
-       sdkParams := arvadosclient.Dict{
-               "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC", "uuid ASC"},
-               "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
-               "offset":  0}
-
-       if params.BatchSize > 0 {
-               sdkParams["limit"] = params.BatchSize
-       }
-
-       var defaultReplicationLevel int
-       {
-               var value interface{}
-               value, err = params.Client.Discovery("defaultCollectionReplication")
-               if err != nil {
-                       return
-               }
-
-               defaultReplicationLevel = int(value.(float64))
-               if defaultReplicationLevel <= 0 {
-                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
-                               "should be a positive integer but instead it was %d.",
-                               defaultReplicationLevel)
-                       return
-               }
-       }
-
-       initialNumberOfCollectionsAvailable, err :=
-               util.NumberItemsAvailable(params.Client, "collections")
-       if err != nil {
-               return
-       }
-       // Include a 1% margin for collections added while we're reading so
-       // that we don't have to grow the map in most cases.
-       maxExpectedCollections := int(
-               float64(initialNumberOfCollectionsAvailable) * 1.01)
-       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
-
-       if params.Logger != nil {
-               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
-                       collectionInfo["batch_size"] = params.BatchSize
-                       collectionInfo["default_replication_level"] = defaultReplicationLevel
-               })
-       }
-
-       // These values are just for getting the loop to run the first time,
-       // afterwards they'll be set to real values.
-       remainingCollections := 1
-       var totalCollections int
-       var previousTotalCollections int
-       for remainingCollections > 0 {
-               // We're still finding new collections
-
-               // Write the heap profile for examining memory usage
-               err = WriteHeapProfile()
-               if err != nil {
-                       return
-               }
-
-               // Get next batch of collections.
-               var collections SdkCollectionList
-               err = params.Client.List("collections", sdkParams, &collections)
-               if err != nil {
-                       return
-               }
-               batchCollections := len(collections.Items)
-
-               // We must always have at least one collection in the batch
-               if batchCollections < 1 {
-                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
-                       return
-               }
-
-               // Update count of remaining collections
-               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
-
-               // Process collection and update our date filter.
-               latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
-                       collections.Items,
-                       defaultReplicationLevel,
-                       results.UUIDToCollection)
-               if err != nil {
-                       return results, err
-               }
-               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
-                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
-                       sdkParams["offset"] = 0
-               } else {
-                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
-               }
-
-               // update counts
-               previousTotalCollections = totalCollections
-               totalCollections = len(results.UUIDToCollection)
-
-               log.Printf("%d collections read, %d (%d new) in last batch, "+
-                       "%d remaining, "+
-                       "%s latest modified date, %.0f %d %d avg,max,total manifest size",
-                       totalCollections,
-                       batchCollections,
-                       totalCollections-previousTotalCollections,
-                       remainingCollections,
-                       sdkParams["filters"].([][]string)[0][2],
-                       float32(totalManifestSize)/float32(totalCollections),
-                       maxManifestSize, totalManifestSize)
-
-               if params.Logger != nil {
-                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                               collectionInfo["collections_read"] = totalCollections
-                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
-                               collectionInfo["total_manifest_size"] = totalManifestSize
-                               collectionInfo["max_manifest_size"] = maxManifestSize
-                       })
-               }
-       }
-
-       // Make one final API request to verify that we have processed all collections available up to the latest modification date
-       var collections SdkCollectionList
-       sdkParams["filters"].([][]string)[0][1] = "<="
-       sdkParams["limit"] = 0
-       err = params.Client.List("collections", sdkParams, &collections)
-       if err != nil {
-               return
-       }
-       finalNumberOfCollectionsAvailable, err :=
-               util.NumberItemsAvailable(params.Client, "collections")
-       if err != nil {
-               return
-       }
-       if totalCollections < finalNumberOfCollectionsAvailable {
-               err = fmt.Errorf("API server indicates a total of %d collections "+
-                       "available up to %v, but we only retrieved %d. "+
-                       "Refusing to continue as this could indicate an "+
-                       "otherwise undetected failure.",
-                       finalNumberOfCollectionsAvailable,
-                       sdkParams["filters"].([][]string)[0][2],
-                       totalCollections)
-               return
-       }
-
-       // Write the heap profile for examining memory usage
-       err = WriteHeapProfile()
-
-       return
-}
-
-// StrCopy returns a newly allocated string.
-// It is useful to copy slices so that the garbage collector can reuse
-// the memory of the longer strings they came from.
-func StrCopy(s string) string {
-       return string([]byte(s))
-}
-
-// ProcessCollections read from api server
-func ProcessCollections(arvLogger *logger.Logger,
-       receivedCollections []SdkCollectionInfo,
-       defaultReplicationLevel int,
-       UUIDToCollection map[string]Collection,
-) (
-       latestModificationDate time.Time,
-       maxManifestSize, totalManifestSize uint64,
-       err error,
-) {
-       for _, sdkCollection := range receivedCollections {
-               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
-                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
-                       ReplicationLevel:  sdkCollection.ReplicationDesired,
-                       BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
-
-               if sdkCollection.ModifiedAt.IsZero() {
-                       err = fmt.Errorf(
-                               "Arvados SDK collection returned with unexpected zero "+
-                                       "modification date. This probably means that either we failed to "+
-                                       "parse the modification date or the API server has changed how "+
-                                       "it returns modification dates: %+v",
-                               collection)
-                       return
-               }
-
-               if sdkCollection.ModifiedAt.After(latestModificationDate) {
-                       latestModificationDate = sdkCollection.ModifiedAt
-               }
-
-               if collection.ReplicationLevel == 0 {
-                       collection.ReplicationLevel = defaultReplicationLevel
-               }
-
-               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
-               manifestSize := uint64(len(sdkCollection.ManifestText))
-
-               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
-                       totalManifestSize += manifestSize
-               }
-               if manifestSize > maxManifestSize {
-                       maxManifestSize = manifestSize
-               }
-
-               blockChannel := manifest.BlockIterWithDuplicates()
-               for block := range blockChannel {
-                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
-                               log.Printf(
-                                       "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                       collection.UUID,
-                                       storedSize,
-                                       block.Size,
-                                       block.Digest)
-                       }
-                       collection.BlockDigestToSize[block.Digest] = block.Size
-               }
-               if manifest.Err != nil {
-                       err = manifest.Err
-                       return
-               }
-
-               collection.TotalSize = 0
-               for _, size := range collection.BlockDigestToSize {
-                       collection.TotalSize += size
-               }
-               UUIDToCollection[collection.UUID] = collection
-
-               // Clear out all the manifest strings that we don't need anymore.
-               // These hopefully form the bulk of our memory usage.
-               manifest.Text = ""
-               sdkCollection.ManifestText = ""
-       }
-
-       return
-}
-
-// Summarize the collections read
-func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
-       readCollections.OwnerToCollectionSize = make(map[string]int)
-       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
-       numCollections := len(readCollections.UUIDToCollection)
-       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
-       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
-       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
-
-       for _, coll := range readCollections.UUIDToCollection {
-               collectionIndex := len(readCollections.CollectionIndexToUUID)
-               readCollections.CollectionIndexToUUID =
-                       append(readCollections.CollectionIndexToUUID, coll.UUID)
-               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
-
-               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
-                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
-
-               for block, size := range coll.BlockDigestToSize {
-                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
-                       readCollections.BlockToCollectionIndices[locator] =
-                               append(readCollections.BlockToCollectionIndices[locator],
-                                       collectionIndex)
-                       storedReplication := readCollections.BlockToDesiredReplication[locator]
-                       if coll.ReplicationLevel > storedReplication {
-                               readCollections.BlockToDesiredReplication[locator] =
-                                       coll.ReplicationLevel
-                       }
-               }
-       }
-
-       if arvLogger != nil {
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                       // Since maps are shallow copied, we run a risk of concurrent
-                       // updates here. By copying results.OwnerToCollectionSize into
-                       // the log, we're assuming that it won't be updated.
-                       collectionInfo["owner_to_collection_size"] =
-                               readCollections.OwnerToCollectionSize
-                       collectionInfo["distinct_blocks_named"] =
-                               len(readCollections.BlockToDesiredReplication)
-               })
-       }
-
-       return
-}
diff --git a/services/datamanager/collection/collection_test.go b/services/datamanager/collection/collection_test.go
deleted file mode 100644 (file)
index 1bf6a89..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-package collection
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       . "gopkg.in/check.v1"
-       "net/http"
-       "net/http/httptest"
-       "testing"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
-       TestingT(t)
-}
-
-type MySuite struct{}
-
-var _ = Suite(&MySuite{})
-
-// This captures the result we expect from
-// ReadCollections.Summarize().  Because CollectionUUIDToIndex is
-// indeterminate, we replace BlockToCollectionIndices with
-// BlockToCollectionUuids.
-type ExpectedSummary struct {
-       OwnerToCollectionSize     map[string]int
-       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       BlockToCollectionUuids    map[blockdigest.DigestWithSize][]string
-}
-
-func CompareSummarizedReadCollections(c *C,
-       summarized ReadCollections,
-       expected ExpectedSummary) {
-
-       c.Assert(summarized.OwnerToCollectionSize, DeepEquals,
-               expected.OwnerToCollectionSize)
-
-       c.Assert(summarized.BlockToDesiredReplication, DeepEquals,
-               expected.BlockToDesiredReplication)
-
-       summarizedBlockToCollectionUuids :=
-               make(map[blockdigest.DigestWithSize]map[string]struct{})
-       for digest, indices := range summarized.BlockToCollectionIndices {
-               uuidSet := make(map[string]struct{})
-               summarizedBlockToCollectionUuids[digest] = uuidSet
-               for _, index := range indices {
-                       uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{}
-               }
-       }
-
-       expectedBlockToCollectionUuids :=
-               make(map[blockdigest.DigestWithSize]map[string]struct{})
-       for digest, uuidSlice := range expected.BlockToCollectionUuids {
-               uuidSet := make(map[string]struct{})
-               expectedBlockToCollectionUuids[digest] = uuidSet
-               for _, uuid := range uuidSlice {
-                       uuidSet[uuid] = struct{}{}
-               }
-       }
-
-       c.Assert(summarizedBlockToCollectionUuids, DeepEquals,
-               expectedBlockToCollectionUuids)
-}
-
-func (s *MySuite) TestSummarizeSimple(checker *C) {
-       rc := MakeTestReadCollections([]TestCollectionSpec{{
-               ReplicationLevel: 5,
-               Blocks:           []int{1, 2},
-       }})
-
-       rc.Summarize(nil)
-
-       c := rc.UUIDToCollection["col0"]
-
-       blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
-       blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
-
-       expected := ExpectedSummary{
-               OwnerToCollectionSize:     map[string]int{c.OwnerUUID: c.TotalSize},
-               BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
-               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: {c.UUID}, blockDigest2: {c.UUID}},
-       }
-
-       CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-func (s *MySuite) TestSummarizeOverlapping(checker *C) {
-       rc := MakeTestReadCollections([]TestCollectionSpec{
-               {
-                       ReplicationLevel: 5,
-                       Blocks:           []int{1, 2},
-               },
-               {
-                       ReplicationLevel: 8,
-                       Blocks:           []int{2, 3},
-               },
-       })
-
-       rc.Summarize(nil)
-
-       c0 := rc.UUIDToCollection["col0"]
-       c1 := rc.UUIDToCollection["col1"]
-
-       blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
-       blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
-       blockDigest3 := blockdigest.MakeTestDigestWithSize(3)
-
-       expected := ExpectedSummary{
-               OwnerToCollectionSize: map[string]int{
-                       c0.OwnerUUID: c0.TotalSize,
-                       c1.OwnerUUID: c1.TotalSize,
-               },
-               BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{
-                       blockDigest1: 5,
-                       blockDigest2: 8,
-                       blockDigest3: 8,
-               },
-               BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
-                       blockDigest1: {c0.UUID},
-                       blockDigest2: {c0.UUID, c1.UUID},
-                       blockDigest3: {c1.UUID},
-               },
-       }
-
-       CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-type APITestData struct {
-       // path and response map
-       responses map[string]arvadostest.StubResponse
-
-       // expected error, if any
-       expectedError string
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_DiscoveryError(c *C) {
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     make(map[string]arvadostest.StubResponse),
-                       expectedError: "arvados API server error: 500.*",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_ApiErrorGetCollections(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{-1, ``}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "arvados API server error: 302.*",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadStreamName(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"badstreamname"}]}`}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "Invalid stream name: badstreamname",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadFileToken(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"./goodstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt"}]}`}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "Invalid file token: file2.txt",
-               })
-}
-
-func testGetCollectionsAndSummarize(c *C, testData APITestData) {
-       apiStub := arvadostest.ServerStub{testData.responses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       // GetCollectionsAndSummarize
-       _, err := GetCollectionsAndSummarize(GetCollectionsParams{arv, nil, 10})
-
-       if testData.expectedError == "" {
-               c.Assert(err, IsNil)
-       } else {
-               c.Assert(err, ErrorMatches, testData.expectedError)
-       }
-}
diff --git a/services/datamanager/collection/testing.go b/services/datamanager/collection/testing.go
deleted file mode 100644 (file)
index 2238433..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// Code used for testing only.
-
-package collection
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-)
-
-// TestCollectionSpec with test blocks and desired replication level
-type TestCollectionSpec struct {
-       // The desired replication level
-       ReplicationLevel int
-       // Blocks this contains, represented by ints. Ints repeated will
-       // still only represent one block
-       Blocks []int
-}
-
-// MakeTestReadCollections creates a ReadCollections object for testing
-// based on the give specs. Only the ReadAllCollections and UUIDToCollection
-// fields are populated. To populate other fields call rc.Summarize().
-func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
-       rc = ReadCollections{
-               ReadAllCollections: true,
-               UUIDToCollection:   map[string]Collection{},
-       }
-
-       for i, spec := range specs {
-               c := Collection{
-                       UUID:              fmt.Sprintf("col%d", i),
-                       OwnerUUID:         fmt.Sprintf("owner%d", i),
-                       ReplicationLevel:  spec.ReplicationLevel,
-                       BlockDigestToSize: map[blockdigest.BlockDigest]int{},
-               }
-               rc.UUIDToCollection[c.UUID] = c
-               for _, j := range spec.Blocks {
-                       c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
-               }
-               // We compute the size in a separate loop because the value
-               // computed in the above loop would be invalid if c.Blocks
-               // contained duplicates.
-               for _, size := range c.BlockDigestToSize {
-                       c.TotalSize += size
-               }
-       }
-       return
-}
-
-// CollectionIndicesForTesting returns a slice giving the collection
-// index of each collection that was passed in to MakeTestReadCollections.
-// rc.Summarize() must be called before this method, since Summarize()
-// assigns an index to each collection.
-func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) {
-       // TODO(misha): Assert that rc.Summarize() has been called.
-       numCollections := len(rc.CollectionIndexToUUID)
-       indices = make([]int, numCollections)
-       for i := 0; i < numCollections; i++ {
-               indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)]
-       }
-       return
-}
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
deleted file mode 100644 (file)
index 5250d17..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-/* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
-
-package main
-
-import (
-       "errors"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/sdk/go/util"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
-       "git.curoverse.com/arvados.git/services/datamanager/summary"
-       "log"
-       "time"
-)
-
-var (
-       logEventTypePrefix  string
-       logFrequencySeconds int
-       minutesBetweenRuns  int
-       collectionBatchSize int
-       dryRun              bool
-)
-
-func init() {
-       flag.StringVar(&logEventTypePrefix,
-               "log-event-type-prefix",
-               "experimental-data-manager",
-               "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
-       flag.IntVar(&logFrequencySeconds,
-               "log-frequency-seconds",
-               20,
-               "How frequently we'll write log entries in seconds.")
-       flag.IntVar(&minutesBetweenRuns,
-               "minutes-between-runs",
-               0,
-               "How many minutes we wait between data manager runs. 0 means run once and exit.")
-       flag.IntVar(&collectionBatchSize,
-               "collection-batch-size",
-               1000,
-               "How many collections to request in each batch.")
-       flag.BoolVar(&dryRun,
-               "dry-run",
-               false,
-               "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
-}
-
-func main() {
-       flag.Parse()
-
-       if minutesBetweenRuns == 0 {
-               arv, err := arvadosclient.MakeArvadosClient()
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
-               }
-               err = singlerun(arv)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
-               }
-       } else {
-               waitTime := time.Minute * time.Duration(minutesBetweenRuns)
-               for {
-                       log.Println("Beginning Run")
-                       arv, err := arvadosclient.MakeArvadosClient()
-                       if err != nil {
-                               loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
-                       }
-                       err = singlerun(arv)
-                       if err != nil {
-                               log.Printf("singlerun: %v", err)
-                       }
-                       log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
-                       time.Sleep(waitTime)
-               }
-       }
-}
-
-var arvLogger *logger.Logger
-
-func singlerun(arv *arvadosclient.ArvadosClient) error {
-       var err error
-       if isAdmin, err := util.UserIsAdmin(arv); err != nil {
-               return errors.New("Error verifying admin token: " + err.Error())
-       } else if !isAdmin {
-               return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
-       }
-
-       if logEventTypePrefix != "" {
-               arvLogger, err = logger.NewLogger(logger.LoggerParams{
-                       Client:          arv,
-                       EventTypePrefix: logEventTypePrefix,
-                       WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
-       }
-
-       loggerutil.LogRunInfo(arvLogger)
-       if arvLogger != nil {
-               arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
-       }
-
-       var (
-               dataFetcher     summary.DataFetcher
-               readCollections collection.ReadCollections
-               keepServerInfo  keep.ReadServers
-       )
-
-       if summary.ShouldReadData() {
-               dataFetcher = summary.ReadData
-       } else {
-               dataFetcher = BuildDataFetcher(arv)
-       }
-
-       err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
-       if err != nil {
-               return err
-       }
-
-       err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
-       if err != nil {
-               return err
-       }
-
-       buckets := summary.BucketReplication(readCollections, keepServerInfo)
-       bucketCounts := buckets.Counts()
-
-       replicationSummary := buckets.SummarizeBuckets(readCollections)
-       replicationCounts := replicationSummary.ComputeCounts()
-
-       log.Printf("Blocks In Collections: %d, "+
-               "\nBlocks In Keep: %d.",
-               len(readCollections.BlockToDesiredReplication),
-               len(keepServerInfo.BlockToServers))
-       log.Println(replicationCounts.PrettyPrint())
-
-       log.Printf("Blocks Histogram:")
-       for _, rlbss := range bucketCounts {
-               log.Printf("%+v: %10d",
-                       rlbss.Levels,
-                       rlbss.Count)
-       }
-
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               return fmt.Errorf("Error setting up keep client %v", err.Error())
-       }
-
-       // Log that we're finished. We force the recording, since go will
-       // not wait for the write timer before exiting.
-       if arvLogger != nil {
-               defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
-                       summaryInfo["block_replication_counts"] = bucketCounts
-                       summaryInfo["replication_summary"] = replicationCounts
-                       p["summary_info"] = summaryInfo
-
-                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
-               })
-       }
-
-       pullServers := summary.ComputePullServers(kc,
-               &keepServerInfo,
-               readCollections.BlockToDesiredReplication,
-               replicationSummary.UnderReplicatedBlocks)
-
-       pullLists := summary.BuildPullLists(pullServers)
-
-       trashLists, trashErr := summary.BuildTrashLists(kc,
-               &keepServerInfo,
-               replicationSummary.KeepBlocksNotInCollections)
-
-       err = summary.WritePullLists(arvLogger, pullLists, dryRun)
-       if err != nil {
-               return err
-       }
-
-       if trashErr != nil {
-               return err
-       }
-       keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
-
-       return nil
-}
-
-// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
-func BuildDataFetcher(arv *arvadosclient.ArvadosClient) summary.DataFetcher {
-       return func(
-               arvLogger *logger.Logger,
-               readCollections *collection.ReadCollections,
-               keepServerInfo *keep.ReadServers,
-       ) error {
-               collDone := make(chan struct{})
-               var collErr error
-               go func() {
-                       *readCollections, collErr = collection.GetCollectionsAndSummarize(
-                               collection.GetCollectionsParams{
-                                       Client:    arv,
-                                       Logger:    arvLogger,
-                                       BatchSize: collectionBatchSize})
-                       collDone <- struct{}{}
-               }()
-
-               var keepErr error
-               *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
-                       keep.GetKeepServersParams{
-                               Client: arv,
-                               Logger: arvLogger,
-                               Limit:  1000})
-
-               <-collDone
-
-               // Return a nil error only if both parts succeeded.
-               if collErr != nil {
-                       return collErr
-               }
-               return keepErr
-       }
-}
diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go
deleted file mode 100644 (file)
index 7a8fff5..0000000
+++ /dev/null
@@ -1,732 +0,0 @@
-package main
-
-import (
-       "encoding/json"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/summary"
-       "io/ioutil"
-       "net/http"
-       "os"
-       "os/exec"
-       "path"
-       "regexp"
-       "strings"
-       "testing"
-       "time"
-)
-
-var arv *arvadosclient.ArvadosClient
-var keepClient *keepclient.KeepClient
-var keepServers []string
-
-func SetupDataManagerTest(t *testing.T) {
-       os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-
-       // start api and keep servers
-       arvadostest.ResetEnv()
-       arvadostest.StartAPI()
-       arvadostest.StartKeep(2, false)
-
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
-       if err != nil {
-               t.Fatalf("Error making arvados client: %s", err)
-       }
-       arv.ApiToken = arvadostest.DataManagerToken
-
-       // keep client
-       keepClient = &keepclient.KeepClient{
-               Arvados:       arv,
-               Want_replicas: 2,
-               Client:        &http.Client{},
-       }
-
-       // discover keep services
-       if err = keepClient.DiscoverKeepServers(); err != nil {
-               t.Fatalf("Error discovering keep services: %s", err)
-       }
-       keepServers = []string{}
-       for _, host := range keepClient.LocalRoots() {
-               keepServers = append(keepServers, host)
-       }
-}
-
-func TearDownDataManagerTest(t *testing.T) {
-       arvadostest.StopKeep(2)
-       arvadostest.StopAPI()
-       summary.WriteDataTo = ""
-       collection.HeapProfileFilename = ""
-}
-
-func putBlock(t *testing.T, data string) string {
-       locator, _, err := keepClient.PutB([]byte(data))
-       if err != nil {
-               t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
-       }
-       if locator == "" {
-               t.Fatalf("No locator found after putting test data")
-       }
-
-       splits := strings.Split(locator, "+")
-       return splits[0] + "+" + splits[1]
-}
-
-func getBlock(t *testing.T, locator string, data string) {
-       reader, blocklen, _, err := keepClient.Get(locator)
-       if err != nil {
-               t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
-       }
-       if reader == nil {
-               t.Fatalf("No reader found after putting test data")
-       }
-       if blocklen != int64(len(data)) {
-               t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
-       }
-
-       all, err := ioutil.ReadAll(reader)
-       if string(all) != data {
-               t.Fatalf("Data read %s did not match expected data %s", string(all), data)
-       }
-}
-
-// Create a collection using arv-put
-func createCollection(t *testing.T, data string) string {
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
-       defer os.Remove(tempfile.Name())
-
-       _, err = tempfile.Write([]byte(data))
-       if err != nil {
-               t.Fatalf("Error writing to tempfile %v", err)
-       }
-
-       // arv-put
-       output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
-       if err != nil {
-               t.Fatalf("Error running arv-put %s", err)
-       }
-
-       uuid := string(output[0:27]) // trim terminating char
-       return uuid
-}
-
-// Get collection locator
-var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
-
-func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
-       manifest := getCollection(t, uuid)["manifest_text"].(string)
-
-       locator := strings.Split(manifest, " ")[1]
-       match := locatorMatcher.FindStringSubmatch(locator)
-       if match == nil {
-               t.Fatalf("No locator found in collection manifest %s", manifest)
-       }
-
-       return match[1] + "+" + match[2]
-}
-
-func switchToken(t string) func() {
-       orig := arv.ApiToken
-       restore := func() {
-               arv.ApiToken = orig
-       }
-       arv.ApiToken = t
-       return restore
-}
-
-func getCollection(t *testing.T, uuid string) Dict {
-       defer switchToken(arvadostest.AdminToken)()
-
-       getback := make(Dict)
-       err := arv.Get("collections", uuid, nil, &getback)
-       if err != nil {
-               t.Fatalf("Error getting collection %s", err)
-       }
-       if getback["uuid"] != uuid {
-               t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
-       }
-
-       return getback
-}
-
-func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       err := arv.Update("collections", uuid, arvadosclient.Dict{
-               "collection": arvadosclient.Dict{
-                       paramName: paramValue,
-               },
-       }, &arvadosclient.Dict{})
-
-       if err != nil {
-               t.Fatalf("Error updating collection %s", err)
-       }
-}
-
-type Dict map[string]interface{}
-
-func deleteCollection(t *testing.T, uuid string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       getback := make(Dict)
-       err := arv.Delete("collections", uuid, nil, &getback)
-       if err != nil {
-               t.Fatalf("Error deleting collection %s", err)
-       }
-       if getback["uuid"] != uuid {
-               t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
-       }
-}
-
-func dataManagerSingleRun(t *testing.T) {
-       err := singlerun(arv)
-       if err != nil {
-               t.Fatalf("Error during singlerun %s", err)
-       }
-}
-
-func getBlockIndexesForServer(t *testing.T, i int) []string {
-       var indexes []string
-
-       path := keepServers[i] + "/index"
-       client := http.Client{}
-       req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
-       req.Header.Add("Content-Type", "application/octet-stream")
-       resp, err := client.Do(req)
-       defer resp.Body.Close()
-
-       if err != nil {
-               t.Fatalf("Error during %s %s", path, err)
-       }
-
-       body, err := ioutil.ReadAll(resp.Body)
-       if err != nil {
-               t.Fatalf("Error reading response from %s %s", path, err)
-       }
-
-       lines := strings.Split(string(body), "\n")
-       for _, line := range lines {
-               indexes = append(indexes, strings.Split(line, " ")...)
-       }
-
-       return indexes
-}
-
-func getBlockIndexes(t *testing.T) [][]string {
-       var indexes [][]string
-
-       for i := 0; i < len(keepServers); i++ {
-               indexes = append(indexes, getBlockIndexesForServer(t, i))
-       }
-       return indexes
-}
-
-func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
-       blocks := getBlockIndexes(t)
-
-       for _, block := range notExpected {
-               for _, idx := range blocks {
-                       if valueInArray(block, idx) {
-                               t.Fatalf("Found unexpected block %s", block)
-                       }
-               }
-       }
-
-       for _, block := range expected {
-               nFound := 0
-               for _, idx := range blocks {
-                       if valueInArray(block, idx) {
-                               nFound++
-                       }
-               }
-               if nFound < minReplication {
-                       t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
-               }
-       }
-}
-
-func valueInArray(value string, list []string) bool {
-       for _, v := range list {
-               if value == v {
-                       return true
-               }
-       }
-       return false
-}
-
-// Test env uses two keep volumes. The volume names can be found by reading the files
-// ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
-//
-// The keep volumes are of the dir structure: volumeN/subdir/locator
-func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
-       // First get rid of any size hints in the locators
-       var trimmedBlockLocators []string
-       for _, block := range oldUnusedBlockLocators {
-               trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
-       }
-
-       // Get the working dir so that we can read keep{n}.volume files
-       wd, err := os.Getwd()
-       if err != nil {
-               t.Fatalf("Error getting working dir %s", err)
-       }
-
-       // Now cycle through the two keep volumes
-       oldTime := time.Now().AddDate(0, -2, 0)
-       for i := 0; i < 2; i++ {
-               filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
-               volumeDir, err := ioutil.ReadFile(filename)
-               if err != nil {
-                       t.Fatalf("Error reading keep volume file %s %s", filename, err)
-               }
-
-               // Read the keep volume dir structure
-               volumeContents, err := ioutil.ReadDir(string(volumeDir))
-               if err != nil {
-                       t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
-               }
-
-               // Read each subdir for each of the keep volume dir
-               for _, subdir := range volumeContents {
-                       subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
-                       subdirContents, err := ioutil.ReadDir(string(subdirName))
-                       if err != nil {
-                               t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
-                       }
-
-                       // Now we got to the files. The files are names are the block locators
-                       for _, fileInfo := range subdirContents {
-                               blockName := fileInfo.Name()
-                               myname := fmt.Sprintf("%s/%s", subdirName, blockName)
-                               if valueInArray(blockName, trimmedBlockLocators) {
-                                       err = os.Chtimes(myname, oldTime, oldTime)
-                               }
-                       }
-               }
-       }
-}
-
-func getStatus(t *testing.T, path string) interface{} {
-       client := http.Client{}
-       req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
-       req.Header.Add("Content-Type", "application/octet-stream")
-       resp, err := client.Do(req)
-       if err != nil {
-               t.Fatalf("Error during %s %s", path, err)
-       }
-       defer resp.Body.Close()
-
-       var s interface{}
-       json.NewDecoder(resp.Body).Decode(&s)
-
-       return s
-}
-
-// Wait until PullQueue and TrashQueue are empty on all keepServers.
-func waitUntilQueuesFinishWork(t *testing.T) {
-       for _, ks := range keepServers {
-               for done := false; !done; {
-                       time.Sleep(100 * time.Millisecond)
-                       s := getStatus(t, ks+"/status.json")
-                       for _, qName := range []string{"PullQueue", "TrashQueue"} {
-                               qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
-                               if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
-                                       done = true
-                               }
-                       }
-               }
-       }
-}
-
-// Create some blocks and backdate some of them.
-// Also create some collections and delete some of them.
-// Verify block indexes.
-func TestPutAndGetBlocks(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // Put some blocks which will be backdated later on
-       // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
-       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
-       var oldUnusedBlockLocators []string
-       oldUnusedBlockData := "this block will have older mtime"
-       for i := 0; i < 5; i++ {
-               oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
-       }
-       for i := 0; i < 5; i++ {
-               getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
-       }
-
-       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
-       oldUsedBlockData := "this collection block will have older mtime"
-       oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
-       getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
-
-       // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
-       // Hence, even though unreferenced, these should not be deleted when datamanager runs.
-       var newBlockLocators []string
-       newBlockData := "this block is newer"
-       for i := 0; i < 5; i++ {
-               newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
-       }
-       for i := 0; i < 5; i++ {
-               getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
-       }
-
-       // Create a collection that would be deleted later on
-       toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
-       toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
-
-       // Create another collection that has the same data as the one of the old blocks
-       oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
-       oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
-       if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
-               t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
-       }
-
-       // Create another collection whose replication level will be changed
-       replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
-       replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
-
-       // Create two collections with same data; one will be deleted later on
-       dataForTwoCollections := "one of these collections will be deleted"
-       oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
-       oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
-       secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
-       secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
-       if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
-               t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
-       }
-
-       // create collection with empty manifest text
-       emptyBlockLocator := putBlock(t, "")
-       emptyCollection := createCollection(t, "")
-
-       // Verify blocks before doing any backdating / deleting.
-       var expected []string
-       expected = append(expected, oldUnusedBlockLocators...)
-       expected = append(expected, newBlockLocators...)
-       expected = append(expected, toBeDeletedCollectionLocator)
-       expected = append(expected, replicationCollectionLocator)
-       expected = append(expected, oneOfTwoWithSameDataLocator)
-       expected = append(expected, secondOfTwoWithSameDataLocator)
-       expected = append(expected, emptyBlockLocator)
-
-       verifyBlocks(t, nil, expected, 2)
-
-       // Run datamanager in singlerun mode
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       verifyBlocks(t, nil, expected, 2)
-
-       // Backdate the to-be old blocks and delete the collections
-       backdateBlocks(t, oldUnusedBlockLocators)
-       deleteCollection(t, toBeDeletedCollectionUUID)
-       deleteCollection(t, secondOfTwoWithSameDataUUID)
-       backdateBlocks(t, []string{emptyBlockLocator})
-       deleteCollection(t, emptyCollection)
-
-       // Run data manager again
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
-       expected = expected[:0]
-       expected = append(expected, oldUsedBlockLocator)
-       expected = append(expected, newBlockLocators...)
-       expected = append(expected, toBeDeletedCollectionLocator)
-       expected = append(expected, oneOfTwoWithSameDataLocator)
-       expected = append(expected, secondOfTwoWithSameDataLocator)
-       expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
-
-       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-
-       // Reduce desired replication on replicationCollectionUUID
-       // collection, and verify that Data Manager does not reduce
-       // actual replication any further than that. (It might not
-       // reduce actual replication at all; that's OK for this test.)
-
-       // Reduce desired replication level.
-       updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
-       collection := getCollection(t, replicationCollectionUUID)
-       if collection["replication_desired"].(interface{}) != float64(1) {
-               t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
-       }
-
-       // Verify data is currently overreplicated.
-       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
-
-       // Run data manager again
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       // Verify data is not underreplicated.
-       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
-
-       // Verify *other* collections' data is not underreplicated.
-       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-}
-
-func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       for i := 0; i < 10; i++ {
-               err := singlerun(arv)
-               if err != nil {
-                       t.Fatalf("Got an error during datamanager singlerun: %v", err)
-               }
-       }
-}
-
-func TestGetStatusRepeatedly(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       for i := 0; i < 10; i++ {
-               for j := 0; j < 2; j++ {
-                       s := getStatus(t, keepServers[j]+"/status.json")
-
-                       var pullQueueStatus interface{}
-                       pullQueueStatus = s.(map[string]interface{})["PullQueue"]
-                       var trashQueueStatus interface{}
-                       trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
-
-                       if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
-                               pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
-                               trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
-                               trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
-                               t.Fatalf("PullQueue and TrashQueue status not found")
-                       }
-
-                       time.Sleep(100 * time.Millisecond)
-               }
-       }
-}
-
-func TestRunDatamanagerWithBogusServer(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       arv.ApiServer = "bogus-server"
-
-       err := singlerun(arv)
-       if err == nil {
-               t.Fatalf("Expected error during singlerun with bogus server")
-       }
-}
-
-func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       arv.ApiToken = arvadostest.ActiveToken
-
-       err := singlerun(arv)
-       if err == nil {
-               t.Fatalf("Expected error during singlerun as non-admin user")
-       }
-}
-
-func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
-       badpath, err := arvadostest.CreateBadPath()
-       if err != nil {
-               t.Fatalf(err.Error())
-       }
-       defer func() {
-               err = arvadostest.DestroyBadPath(badpath)
-               if err != nil {
-                       t.Fatalf(err.Error())
-               }
-       }()
-       testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
-       badpath, err := arvadostest.CreateBadPath()
-       if err != nil {
-               t.Fatalf(err.Error())
-       }
-       defer func() {
-               err = arvadostest.DestroyBadPath(badpath)
-               if err != nil {
-                       t.Fatalf(err.Error())
-               }
-       }()
-       testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
-}
-
-// Create some blocks and backdate some of them.
-// Run datamanager while producing an error condition.
-// Verify that the blocks are hence not deleted.
-func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // Put some blocks and backdate them.
-       var oldUnusedBlockLocators []string
-       oldUnusedBlockData := "this block will have older mtime"
-       for i := 0; i < 5; i++ {
-               oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
-       }
-       backdateBlocks(t, oldUnusedBlockLocators)
-
-       // Run data manager
-       summary.WriteDataTo = writeDataTo
-       collection.HeapProfileFilename = heapProfileFile
-
-       err := singlerun(arv)
-       if !expectError {
-               if err != nil {
-                       t.Fatalf("Got an error during datamanager singlerun: %v", err)
-               }
-       } else {
-               if err == nil {
-                       t.Fatalf("Expected error during datamanager singlerun")
-               }
-       }
-       waitUntilQueuesFinishWork(t)
-
-       // Get block indexes and verify that all backdated blocks are not/deleted as expected
-       if expectOldBlocks {
-               verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
-       } else {
-               verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
-       }
-}
-
-// Create a collection with multiple streams and blocks
-func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       manifest := ""
-       locators := make(map[string]bool)
-       for s := 0; s < numStreams; s++ {
-               manifest += fmt.Sprintf("./stream%d ", s)
-               for b := 0; b < numBlocks; b++ {
-                       locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
-                       if err != nil {
-                               t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
-                       }
-                       locators[strings.Split(locator, "+A")[0]] = true
-                       manifest += locator + " "
-               }
-               manifest += "0:1:dummyfile.txt\n"
-       }
-
-       collection := make(Dict)
-       err := arv.Create("collections",
-               arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
-               &collection)
-
-       if err != nil {
-               t.Fatalf("Error creating collection %v", err)
-       }
-
-       var locs []string
-       for k := range locators {
-               locs = append(locs, k)
-       }
-
-       return collection["uuid"].(string), locs
-}
-
-// Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
-// Also, create stray block and backdate it.
-// After datamanager run: expect blocks from the collection, but not the stray block.
-func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
-}
-
-// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
-// keepstore of a service type other than "disk". Only the "disk" type services
-// will be indexed by datamanager and hence should work the same way.
-func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
-}
-
-// Test datamanager with dry-run. Expect no block to be deleted.
-func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
-}
-
-func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // create collection whose blocks will be backdated
-       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
-       if collectionWithOldBlocks == "" {
-               t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
-       }
-       if len(oldBlocks) != numStreams*numBlocks {
-               t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
-       }
-
-       // create a stray block that will be backdated
-       strayOldBlock := putBlock(t, "this stray block is old")
-
-       expected := []string{strayOldBlock}
-       expected = append(expected, oldBlocks...)
-       verifyBlocks(t, nil, expected, 2)
-
-       // Backdate old blocks; but the collection still references these blocks
-       backdateBlocks(t, oldBlocks)
-
-       // also backdate the stray old block
-       backdateBlocks(t, []string{strayOldBlock})
-
-       // If requested, create an extra keepserver with the given type
-       // This should be ignored during indexing and hence not change the datamanager outcome
-       var extraKeepServerUUID string
-       if createExtraKeepServerWithType != "" {
-               extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
-               defer deleteExtraKeepServer(extraKeepServerUUID)
-       }
-
-       // run datamanager
-       dryRun = isDryRun
-       dataManagerSingleRun(t)
-
-       if dryRun {
-               // verify that all blocks, including strayOldBlock, are still to be found
-               verifyBlocks(t, nil, expected, 2)
-       } else {
-               // verify that strayOldBlock is not to be found, but the collections blocks are still there
-               verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
-       }
-}
-
-// Add one more keepstore with the given service type
-func addExtraKeepServer(t *testing.T, serviceType string) string {
-       defer switchToken(arvadostest.AdminToken)()
-
-       extraKeepService := make(arvadosclient.Dict)
-       err := arv.Create("keep_services",
-               arvadosclient.Dict{"keep_service": arvadosclient.Dict{
-                       "service_host":     "localhost",
-                       "service_port":     "21321",
-                       "service_ssl_flag": false,
-                       "service_type":     serviceType}},
-               &extraKeepService)
-       if err != nil {
-               t.Fatal(err)
-       }
-
-       return extraKeepService["uuid"].(string)
-}
-
-func deleteExtraKeepServer(uuid string) {
-       defer switchToken(arvadostest.AdminToken)()
-       arv.Delete("keep_services", uuid, nil, nil)
-}
diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py
deleted file mode 100755 (executable)
index 8207bdc..0000000
+++ /dev/null
@@ -1,887 +0,0 @@
-#! /usr/bin/env python
-
-import arvados
-
-import argparse
-import cgi
-import csv
-import json
-import logging
-import math
-import pprint
-import re
-import threading
-import urllib2
-
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict, Counter
-from functools import partial
-from operator import itemgetter
-from SocketServer import ThreadingMixIn
-
-arv = arvados.api('v1')
-
-# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
-byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
-def fileSizeFormat(value):
-  exponent = 0 if value == 0 else int(math.log(value, 1024))
-  return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
-                         byteunits[exponent])
-
-def percentageFloor(x):
-  """ Returns a float which is the input rounded down to the neared 0.01.
-
-e.g. precentageFloor(0.941354) = 0.94
-"""
-  return math.floor(x*100) / 100.0
-
-
-def byteSizeFromValidUuid(valid_uuid):
-  return int(valid_uuid.split('+')[1])
-
-class maxdict(dict):
-  """A dictionary that holds the largest value entered for each key."""
-  def addValue(self, key, value):
-    dict.__setitem__(self, key, max(dict.get(self, key), value))
-  def addValues(self, kv_pairs):
-    for key,value in kv_pairs:
-      self.addValue(key, value)
-  def addDict(self, d):
-    self.addValues(d.items())
-
-class CollectionInfo:
-  DEFAULT_PERSISTER_REPLICATION_LEVEL=2
-  all_by_uuid = {}
-
-  def __init__(self, uuid):
-    if CollectionInfo.all_by_uuid.has_key(uuid):
-      raise ValueError('Collection for uuid "%s" already exists.' % uuid)
-    self.uuid = uuid
-    self.block_uuids = set()  # uuids of keep blocks in this collection
-    self.reader_uuids = set()  # uuids of users who can read this collection
-    self.persister_uuids = set()  # uuids of users who want this collection saved
-    # map from user uuid to replication level they desire
-    self.persister_replication = maxdict()
-
-    # The whole api response in case we need anything else later.
-    self.api_response = []
-    CollectionInfo.all_by_uuid[uuid] = self
-
-  def byteSize(self):
-    return sum(map(byteSizeFromValidUuid, self.block_uuids))
-
-  def __str__(self):
-    return ('CollectionInfo uuid: %s\n'
-            '               %d block(s) containing %s\n'
-            '               reader_uuids: %s\n'
-            '               persister_replication: %s' %
-            (self.uuid,
-             len(self.block_uuids),
-             fileSizeFormat(self.byteSize()),
-             pprint.pformat(self.reader_uuids, indent = 15),
-             pprint.pformat(self.persister_replication, indent = 15)))
-
-  @staticmethod
-  def get(uuid):
-    if not CollectionInfo.all_by_uuid.has_key(uuid):
-      CollectionInfo(uuid)
-    return CollectionInfo.all_by_uuid[uuid]
-
-
-def extractUuid(candidate):
-  """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
-  match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
-  return match and match.group(1)
-
-def checkUserIsAdmin():
-  current_user = arv.users().current().execute()
-
-  if not current_user['is_admin']:
-    log.warning('Current user %s (%s - %s) does not have '
-                'admin access and will not see much of the data.',
-                current_user['full_name'],
-                current_user['email'],
-                current_user['uuid'])
-    if args.require_admin_user:
-      log.critical('Exiting, rerun with --no-require-admin-user '
-                   'if you wish to continue.')
-      exit(1)
-
-def buildCollectionsList():
-  if args.uuid:
-    return [args.uuid,]
-  else:
-    collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
-
-    print ('Returned %d of %d collections.' %
-           (len(collections_list_response['items']),
-            collections_list_response['items_available']))
-
-    return [item['uuid'] for item in collections_list_response['items']]
-
-
-def readCollections(collection_uuids):
-  for collection_uuid in collection_uuids:
-    collection_block_uuids = set()
-    collection_response = arv.collections().get(uuid=collection_uuid).execute()
-    collection_info = CollectionInfo.get(collection_uuid)
-    collection_info.api_response = collection_response
-    manifest_lines = collection_response['manifest_text'].split('\n')
-
-    if args.verbose:
-      print 'Manifest text for %s:' % collection_uuid
-      pprint.pprint(manifest_lines)
-
-    for manifest_line in manifest_lines:
-      if manifest_line:
-        manifest_tokens = manifest_line.split(' ')
-        if args.verbose:
-          print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
-        stream_name = manifest_tokens[0]
-
-        line_block_uuids = set(filter(None,
-                                      [extractUuid(candidate)
-                                       for candidate in manifest_tokens[1:]]))
-        collection_info.block_uuids.update(line_block_uuids)
-
-        # file_tokens = [token
-        #                for token in manifest_tokens[1:]
-        #                if extractUuid(token) is None]
-
-        # # Sort file tokens by start position in case they aren't already
-        # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
-
-        # if args.verbose:
-        #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
-        #   print 'file_tokens: ' + pprint.pformat(file_tokens)
-
-
-def readLinks():
-  link_classes = set()
-
-  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
-    # TODO(misha): We may not be seing all the links, but since items
-    # available does not return an accurate number, I don't knos how
-    # to confirm that we saw all of them.
-    collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
-    link_classes.update([link['link_class'] for link in collection_links_response['items']])
-    for link in collection_links_response['items']:
-      if link['link_class'] == 'permission':
-        collection_info.reader_uuids.add(link['tail_uuid'])
-      elif link['link_class'] == 'resources':
-        replication_level = link['properties'].get(
-          'replication',
-          CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
-        collection_info.persister_replication.addValue(
-          link['tail_uuid'],
-          replication_level)
-        collection_info.persister_uuids.add(link['tail_uuid'])
-
-  print 'Found the following link classes:'
-  pprint.pprint(link_classes)
-
-def reportMostPopularCollections():
-  most_popular_collections = sorted(
-    CollectionInfo.all_by_uuid.values(),
-    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
-    reverse=True)[:10]
-
-  print 'Most popular Collections:'
-  for collection_info in most_popular_collections:
-    print collection_info
-
-
-def buildMaps():
-  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
-    # Add the block holding the manifest itself for all calculations
-    block_uuids = collection_info.block_uuids.union([collection_uuid,])
-    for block_uuid in block_uuids:
-      block_to_collections[block_uuid].add(collection_uuid)
-      block_to_readers[block_uuid].update(collection_info.reader_uuids)
-      block_to_persisters[block_uuid].update(collection_info.persister_uuids)
-      block_to_persister_replication[block_uuid].addDict(
-        collection_info.persister_replication)
-    for reader_uuid in collection_info.reader_uuids:
-      reader_to_collections[reader_uuid].add(collection_uuid)
-      reader_to_blocks[reader_uuid].update(block_uuids)
-    for persister_uuid in collection_info.persister_uuids:
-      persister_to_collections[persister_uuid].add(collection_uuid)
-      persister_to_blocks[persister_uuid].update(block_uuids)
-
-
-def itemsByValueLength(original):
-  return sorted(original.items(),
-                key=lambda item:len(item[1]),
-                reverse=True)
-
-
-def reportBusiestUsers():
-  busiest_readers = itemsByValueLength(reader_to_collections)
-  print 'The busiest readers are:'
-  for reader,collections in busiest_readers:
-    print '%s reading %d collections.' % (reader, len(collections))
-  busiest_persisters = itemsByValueLength(persister_to_collections)
-  print 'The busiest persisters are:'
-  for persister,collections in busiest_persisters:
-    print '%s reading %d collections.' % (persister, len(collections))
-
-
-def blockDiskUsage(block_uuid):
-  """Returns the disk usage of a block given its uuid.
-
-  Will return 0 before reading the contents of the keep servers.
-  """
-  return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
-
-def blockPersistedUsage(user_uuid, block_uuid):
-  return (byteSizeFromValidUuid(block_uuid) *
-          block_to_persister_replication[block_uuid].get(user_uuid, 0))
-
-memo_computeWeightedReplicationCosts = {}
-def computeWeightedReplicationCosts(replication_levels):
-  """Computes the relative cost of varied replication levels.
-
-  replication_levels: a tuple of integers representing the desired
-  replication level. If n users want a replication level of x then x
-  should appear n times in replication_levels.
-
-  Returns a dictionary from replication level to cost.
-
-  The basic thinking is that the cost of replicating at level x should
-  be shared by everyone who wants replication of level x or higher.
-
-  For example, if we have two users who want 1 copy, one user who
-  wants 3 copies and two users who want 6 copies:
-  the input would be [1, 1, 3, 6, 6] (or any permutation)
-
-  The cost of the first copy is shared by all 5 users, so they each
-  pay 1 copy / 5 users = 0.2.
-  The cost of the second and third copies shared by 3 users, so they
-  each pay 2 copies / 3 users = 0.67 (plus the above costs)
-  The cost of the fourth, fifth and sixth copies is shared by two
-  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
-
-  Here are some other examples:
-  computeWeightedReplicationCosts([1,]) -> {1:1.0}
-  computeWeightedReplicationCosts([2,]) -> {2:2.0}
-  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
-  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
-  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
-  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
-  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
-  """
-  replication_level_counts = sorted(Counter(replication_levels).items())
-
-  memo_key = str(replication_level_counts)
-
-  if not memo_key in memo_computeWeightedReplicationCosts:
-    last_level = 0
-    current_cost = 0
-    total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
-    cost_for_level = {}
-    for replication_level, count in replication_level_counts:
-      copies_added = replication_level - last_level
-      # compute marginal cost from last level and add it to the last cost
-      current_cost += copies_added / total_interested
-      cost_for_level[replication_level] = current_cost
-      # update invariants
-      last_level = replication_level
-      total_interested -= count
-    memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
-
-  return memo_computeWeightedReplicationCosts[memo_key]
-
-def blockPersistedWeightedUsage(user_uuid, block_uuid):
-  persister_replication_for_block = block_to_persister_replication[block_uuid]
-  user_replication = persister_replication_for_block[user_uuid]
-  return (
-    byteSizeFromValidUuid(block_uuid) *
-    computeWeightedReplicationCosts(
-      persister_replication_for_block.values())[user_replication])
-
-
-def computeUserStorageUsage():
-  for user, blocks in reader_to_blocks.items():
-    user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
-        byteSizeFromValidUuid,
-        blocks))
-    user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
-        lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
-                                 len(block_to_readers[block_uuid])),
-        blocks))
-  for user, blocks in persister_to_blocks.items():
-    user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        partial(blockPersistedUsage, user),
-        blocks))
-    user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        partial(blockPersistedWeightedUsage, user),
-        blocks))
-
-def printUserStorageUsage():
-  print ('user: unweighted readable block size, weighted readable block size, '
-         'unweighted persisted block size, weighted persisted block size:')
-  for user, usage in user_to_usage.items():
-    print ('%s: %s %s %s %s' %
-           (user,
-            fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-            fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-            fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-            fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-
-def logUserStorageUsage():
-  for user, usage in user_to_usage.items():
-    body = {}
-    # user could actually represent a user or a group. We don't set
-    # the object_type field since we don't know which we have.
-    body['object_uuid'] = user
-    body['event_type'] = args.user_storage_log_event_type
-    properties = {}
-    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
-    properties['read_collections_weighted_bytes'] = (
-      usage[WEIGHTED_READ_SIZE_COL])
-    properties['persisted_collections_total_bytes'] = (
-      usage[UNWEIGHTED_PERSIST_SIZE_COL])
-    properties['persisted_collections_weighted_bytes'] = (
-      usage[WEIGHTED_PERSIST_SIZE_COL])
-    body['properties'] = properties
-    # TODO(misha): Confirm that this will throw an exception if it
-    # fails to create the log entry.
-    arv.logs().create(body=body).execute()
-
-def getKeepServers():
-  response = arv.keep_disks().list().execute()
-  return [[keep_server['service_host'], keep_server['service_port']]
-          for keep_server in response['items']]
-
-
-def getKeepBlocks(keep_servers):
-  blocks = []
-  for host,port in keep_servers:
-    response = urllib2.urlopen('http://%s:%d/index' % (host, port))
-    server_blocks = [line.split(' ')
-                     for line in response.read().split('\n')
-                     if line]
-    server_blocks = [(block_id, int(mtime))
-                     for block_id, mtime in server_blocks]
-    blocks.append(server_blocks)
-  return blocks
-
-def getKeepStats(keep_servers):
-  MOUNT_COLUMN = 5
-  TOTAL_COLUMN = 1
-  FREE_COLUMN = 3
-  DISK_BLOCK_SIZE = 1024
-  stats = []
-  for host,port in keep_servers:
-    response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
-
-    parsed_json = json.load(response)
-    df_entries = [line.split()
-                  for line in parsed_json['df'].split('\n')
-                  if line]
-    keep_volumes = [columns
-                    for columns in df_entries
-                    if 'keep' in columns[MOUNT_COLUMN]]
-    total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
-                                                  keep_volumes)))
-    free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
-                                                  keep_volumes)))
-    stats.append([total_space, free_space])
-  return stats
-
-
-def computeReplication(keep_blocks):
-  for server_blocks in keep_blocks:
-    for block_uuid, _ in server_blocks:
-      block_to_replication[block_uuid] += 1
-  log.debug('Seeing the following replication levels among blocks: %s',
-            str(set(block_to_replication.values())))
-
-
-def computeGarbageCollectionCandidates():
-  for server_blocks in keep_blocks:
-    block_to_latest_mtime.addValues(server_blocks)
-  empty_set = set()
-  garbage_collection_priority = sorted(
-    [(block,mtime)
-     for block,mtime in block_to_latest_mtime.items()
-     if len(block_to_persisters.get(block,empty_set)) == 0],
-    key = itemgetter(1))
-  global garbage_collection_report
-  garbage_collection_report = []
-  cumulative_disk_size = 0
-  for block,mtime in garbage_collection_priority:
-    disk_size = blockDiskUsage(block)
-    cumulative_disk_size += disk_size
-    garbage_collection_report.append(
-      (block,
-       mtime,
-       disk_size,
-       cumulative_disk_size,
-       float(free_keep_space + cumulative_disk_size)/total_keep_space))
-
-  print 'The oldest Garbage Collection Candidates: '
-  pprint.pprint(garbage_collection_report[:20])
-
-
-def outputGarbageCollectionReport(filename):
-  with open(filename, 'wb') as csvfile:
-    gcwriter = csv.writer(csvfile)
-    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
-                       'cumulative size', 'disk free'])
-    for line in garbage_collection_report:
-      gcwriter.writerow(line)
-
-def computeGarbageCollectionHistogram():
-  # TODO(misha): Modify this to allow users to specify the number of
-  # histogram buckets through a flag.
-  histogram = []
-  last_percentage = -1
-  for _,mtime,_,_,disk_free in garbage_collection_report:
-    curr_percentage = percentageFloor(disk_free)
-    if curr_percentage > last_percentage:
-      histogram.append( (mtime, curr_percentage) )
-    last_percentage = curr_percentage
-
-  log.info('Garbage collection histogram is: %s', histogram)
-
-  return histogram
-
-
-def logGarbageCollectionHistogram():
-  body = {}
-  # TODO(misha): Decide whether we should specify an object_uuid in
-  # the body and if so, which uuid to use.
-  body['event_type'] = args.block_age_free_space_histogram_log_event_type
-  properties = {}
-  properties['histogram'] = garbage_collection_histogram
-  body['properties'] = properties
-  # TODO(misha): Confirm that this will throw an exception if it
-  # fails to create the log entry.
-  arv.logs().create(body=body).execute()
-
-
-def detectReplicationProblems():
-  blocks_not_in_any_collections.update(
-    set(block_to_replication.keys()).difference(block_to_collections.keys()))
-  underreplicated_persisted_blocks.update(
-    [uuid
-     for uuid, persister_replication in block_to_persister_replication.items()
-     if len(persister_replication) > 0 and
-     block_to_replication[uuid] < max(persister_replication.values())])
-  overreplicated_persisted_blocks.update(
-    [uuid
-     for uuid, persister_replication in block_to_persister_replication.items()
-     if len(persister_replication) > 0 and
-     block_to_replication[uuid] > max(persister_replication.values())])
-
-  log.info('Found %d blocks not in any collections, e.g. %s...',
-           len(blocks_not_in_any_collections),
-           ','.join(list(blocks_not_in_any_collections)[:5]))
-  log.info('Found %d underreplicated blocks, e.g. %s...',
-           len(underreplicated_persisted_blocks),
-           ','.join(list(underreplicated_persisted_blocks)[:5]))
-  log.info('Found %d overreplicated blocks, e.g. %s...',
-           len(overreplicated_persisted_blocks),
-           ','.join(list(overreplicated_persisted_blocks)[:5]))
-
-  # TODO:
-  #  Read blocks sorted by mtime
-  #  Cache window vs % free space
-  #  Collections which candidates will appear in
-  #  Youngest underreplicated read blocks that appear in collections.
-  #  Report Collections that have blocks which are missing from (or
-  #   underreplicated in) keep.
-
-
-# This is the main flow here
-
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-"""The command line argument parser we use.
-
-We only use it in the __main__ block, but leave it outside the block
-in case another package wants to use it or customize it by specifying
-it as a parent to their commandline parser.
-"""
-parser.add_argument('-m',
-                    '--max-api-results',
-                    type=int,
-                    default=5000,
-                    help=('The max results to get at once.'))
-parser.add_argument('-p',
-                    '--port',
-                    type=int,
-                    default=9090,
-                    help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
-                    '--verbose',
-                    help='increase output verbosity',
-                    action='store_true')
-parser.add_argument('-u',
-                    '--uuid',
-                    help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
-                    action='store_true',
-                    default=True,
-                    help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
-                    dest='require_admin_user',
-                    action='store_false',
-                    help=('Allow users without admin permissions with '
-                          'only a warning.'))
-parser.add_argument('--log-to-workbench',
-                    action='store_true',
-                    default=False,
-                    help='Log findings to workbench')
-parser.add_argument('--no-log-to-workbench',
-                    dest='log_to_workbench',
-                    action='store_false',
-                    help='Don\'t log findings to workbench [default]')
-parser.add_argument('--user-storage-log-event-type',
-                    default='user-storage-report',
-                    help=('The event type to set when logging user '
-                          'storage usage to workbench.'))
-parser.add_argument('--block-age-free-space-histogram-log-event-type',
-                    default='block-age-free-space-histogram',
-                    help=('The event type to set when logging user '
-                          'storage usage to workbench.'))
-parser.add_argument('--garbage-collection-file',
-                    default='',
-                    help=('The file to write a garbage collection report, or '
-                          'leave empty for no report.'))
-
-args = None
-
-# TODO(misha): Think about moving some of this to the __main__ block.
-log = logging.getLogger('arvados.services.datamanager')
-stderr_handler = logging.StreamHandler()
-log.setLevel(logging.INFO)
-stderr_handler.setFormatter(
-  logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
-log.addHandler(stderr_handler)
-
-# Global Data - don't try this at home
-collection_uuids = []
-
-# These maps all map from uuids to a set of uuids
-block_to_collections = defaultdict(set)  # keep blocks
-reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
-persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
-block_to_readers = defaultdict(set)
-block_to_persisters = defaultdict(set)
-block_to_persister_replication = defaultdict(maxdict)
-reader_to_blocks = defaultdict(set)
-persister_to_blocks = defaultdict(set)
-
-UNWEIGHTED_READ_SIZE_COL = 0
-WEIGHTED_READ_SIZE_COL = 1
-UNWEIGHTED_PERSIST_SIZE_COL = 2
-WEIGHTED_PERSIST_SIZE_COL = 3
-NUM_COLS = 4
-user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
-
-keep_servers = []
-keep_blocks = []
-keep_stats = []
-total_keep_space = 0
-free_keep_space =  0
-
-block_to_replication = defaultdict(lambda: 0)
-block_to_latest_mtime = maxdict()
-
-garbage_collection_report = []
-"""A list of non-persisted blocks, sorted by increasing mtime
-
-Each entry is of the form (block uuid, latest mtime, disk size,
-cumulative size)
-
-* block uuid: The id of the block we want to delete
-* latest mtime: The latest mtime of the block across all keep servers.
-* disk size: The total disk space used by this block (block size
-multiplied by current replication level)
-* cumulative disk size: The sum of this block's disk size and all the
-blocks listed above it
-* disk free: The proportion of our disk space that would be free if we
-deleted this block and all the above. So this is (free disk space +
-cumulative disk size) / total disk capacity
-"""
-
-garbage_collection_histogram = []
-""" Shows the tradeoff of keep block age vs keep disk free space.
-
-Each entry is of the form (mtime, Disk Proportion).
-
-An entry of the form (1388747781, 0.52) means that if we deleted the
-oldest non-presisted blocks until we had 52% of the disk free, then
-all blocks with an mtime greater than 1388747781 would be preserved.
-"""
-
-# Stuff to report on
-blocks_not_in_any_collections = set()
-underreplicated_persisted_blocks = set()
-overreplicated_persisted_blocks = set()
-
-all_data_loaded = False
-
-def loadAllData():
-  checkUserIsAdmin()
-
-  log.info('Building Collection List')
-  global collection_uuids
-  collection_uuids = filter(None, [extractUuid(candidate)
-                                   for candidate in buildCollectionsList()])
-
-  log.info('Reading Collections')
-  readCollections(collection_uuids)
-
-  if args.verbose:
-    pprint.pprint(CollectionInfo.all_by_uuid)
-
-  log.info('Reading Links')
-  readLinks()
-
-  reportMostPopularCollections()
-
-  log.info('Building Maps')
-  buildMaps()
-
-  reportBusiestUsers()
-
-  log.info('Getting Keep Servers')
-  global keep_servers
-  keep_servers = getKeepServers()
-
-  print keep_servers
-
-  log.info('Getting Blocks from each Keep Server.')
-  global keep_blocks
-  keep_blocks = getKeepBlocks(keep_servers)
-
-  log.info('Getting Stats from each Keep Server.')
-  global keep_stats, total_keep_space, free_keep_space
-  keep_stats = getKeepStats(keep_servers)
-
-  total_keep_space = sum(map(itemgetter(0), keep_stats))
-  free_keep_space = sum(map(itemgetter(1), keep_stats))
-
-  # TODO(misha): Delete this hack when the keep servers are fixed!
-  # This hack deals with the fact that keep servers report each other's disks.
-  total_keep_space /= len(keep_stats)
-  free_keep_space /= len(keep_stats)
-
-  log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
-           (fileSizeFormat(total_keep_space),
-            fileSizeFormat(free_keep_space),
-            100*free_keep_space/total_keep_space))
-
-  computeReplication(keep_blocks)
-
-  log.info('average replication level is %f',
-           (float(sum(block_to_replication.values())) /
-            len(block_to_replication)))
-
-  computeGarbageCollectionCandidates()
-
-  if args.garbage_collection_file:
-    log.info('Writing garbage Collection report to %s',
-             args.garbage_collection_file)
-    outputGarbageCollectionReport(args.garbage_collection_file)
-
-  global garbage_collection_histogram
-  garbage_collection_histogram = computeGarbageCollectionHistogram()
-
-  if args.log_to_workbench:
-    logGarbageCollectionHistogram()
-
-  detectReplicationProblems()
-
-  computeUserStorageUsage()
-  printUserStorageUsage()
-  if args.log_to_workbench:
-    logUserStorageUsage()
-
-  global all_data_loaded
-  all_data_loaded = True
-
-
-class DataManagerHandler(BaseHTTPRequestHandler):
-  USER_PATH = 'user'
-  COLLECTION_PATH = 'collection'
-  BLOCK_PATH = 'block'
-
-  def userLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.USER_PATH})
-
-  def collectionLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.COLLECTION_PATH})
-
-  def blockLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.BLOCK_PATH})
-
-  def writeTop(self, title):
-    self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
-
-  def writeBottom(self):
-    self.wfile.write('</BODY></HTML>\n')
-
-  def writeHomePage(self):
-    self.send_response(200)
-    self.end_headers()
-    self.writeTop('Home')
-    self.wfile.write('<TABLE>')
-    self.wfile.write('<TR><TH>user'
-                     '<TH>unweighted readable block size'
-                     '<TH>weighted readable block size'
-                     '<TH>unweighted persisted block size'
-                     '<TH>weighted persisted block size</TR>\n')
-    for user, usage in user_to_usage.items():
-      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
-                       (self.userLink(user),
-                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-    self.wfile.write('</TABLE>\n')
-    self.writeBottom()
-
-  def userExists(self, uuid):
-    # Currently this will return false for a user who exists but
-    # doesn't appear on any manifests.
-    # TODO(misha): Figure out if we need to fix this.
-    return user_to_usage.has_key(uuid)
-
-  def writeUserPage(self, uuid):
-    if not self.userExists(uuid):
-      self.send_error(404,
-                      'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
-    else:
-      # Here we assume that since a user exists, they don't need to be
-      # html escaped.
-      self.send_response(200)
-      self.end_headers()
-      self.writeTop('User %s' % uuid)
-      self.wfile.write('<TABLE>')
-      self.wfile.write('<TR><TH>user'
-                       '<TH>unweighted readable block size'
-                       '<TH>weighted readable block size'
-                       '<TH>unweighted persisted block size'
-                       '<TH>weighted persisted block size</TR>\n')
-      usage = user_to_usage[uuid]
-      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
-                       (self.userLink(uuid),
-                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-      self.wfile.write('</TABLE>\n')
-      self.wfile.write('<P>Persisting Collections: %s\n' %
-                       ', '.join(map(self.collectionLink,
-                                     persister_to_collections[uuid])))
-      self.wfile.write('<P>Reading Collections: %s\n' %
-                       ', '.join(map(self.collectionLink,
-                                     reader_to_collections[uuid])))
-      self.writeBottom()
-
-  def collectionExists(self, uuid):
-    return CollectionInfo.all_by_uuid.has_key(uuid)
-
-  def writeCollectionPage(self, uuid):
-    if not self.collectionExists(uuid):
-      self.send_error(404,
-                      'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
-    else:
-      collection = CollectionInfo.get(uuid)
-      # Here we assume that since a collection exists, its id doesn't
-      # need to be html escaped.
-      self.send_response(200)
-      self.end_headers()
-      self.writeTop('Collection %s' % uuid)
-      self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
-      self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
-                       fileSizeFormat(collection.byteSize()))
-      self.wfile.write('<P>Readers: %s\n' %
-                       ', '.join(map(self.userLink, collection.reader_uuids)))
-
-      if len(collection.persister_replication) == 0:
-        self.wfile.write('<P>No persisters\n')
-      else:
-        replication_to_users = defaultdict(set)
-        for user,replication in collection.persister_replication.items():
-          replication_to_users[replication].add(user)
-        replication_levels = sorted(replication_to_users.keys())
-
-        self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
-                         'out at %dx replication:\n' %
-                         (len(collection.persister_replication),
-                          len(replication_levels),
-                          replication_levels[-1]))
-
-        # TODO(misha): This code is used twice, let's move it to a method.
-        self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                         '<TH>'.join(['Replication Level ' + str(x)
-                                      for x in replication_levels]))
-        self.wfile.write('<TR>\n')
-        for replication_level in replication_levels:
-          users = replication_to_users[replication_level]
-          self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
-              map(self.userLink, users)))
-        self.wfile.write('</TR></TABLE>\n')
-
-      replication_to_blocks = defaultdict(set)
-      for block in collection.block_uuids:
-        replication_to_blocks[block_to_replication[block]].add(block)
-      replication_levels = sorted(replication_to_blocks.keys())
-      self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
-                       (len(collection.block_uuids), len(replication_levels)))
-      self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                       '<TH>'.join(['Replication Level ' + str(x)
-                                    for x in replication_levels]))
-      self.wfile.write('<TR>\n')
-      for replication_level in replication_levels:
-        blocks = replication_to_blocks[replication_level]
-        self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
-      self.wfile.write('</TR></TABLE>\n')
-
-
-  def do_GET(self):
-    if not all_data_loaded:
-      self.send_error(503,
-                      'Sorry, but I am still loading all the data I need.')
-    else:
-      # Removing leading '/' and process request path
-      split_path = self.path[1:].split('/')
-      request_type = split_path[0]
-      log.debug('path (%s) split as %s with request_type %s' % (self.path,
-                                                                split_path,
-                                                                request_type))
-      if request_type == '':
-        self.writeHomePage()
-      elif request_type == DataManagerHandler.USER_PATH:
-        self.writeUserPage(split_path[1])
-      elif request_type == DataManagerHandler.COLLECTION_PATH:
-        self.writeCollectionPage(split_path[1])
-      else:
-        self.send_error(404, 'Unrecognized request path.')
-    return
-
-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
-  """Handle requests in a separate thread."""
-
-
-if __name__ == '__main__':
-  args = parser.parse_args()
-
-  if args.port == 0:
-    loadAllData()
-  else:
-    loader = threading.Thread(target = loadAllData, name = 'loader')
-    loader.start()
-
-    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-    server.serve_forever()
diff --git a/services/datamanager/experimental/datamanager_test.py b/services/datamanager/experimental/datamanager_test.py
deleted file mode 100755 (executable)
index 0842c16..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-#! /usr/bin/env python
-
-import datamanager
-import unittest
-
-class TestComputeWeightedReplicationCosts(unittest.TestCase):
-  def test_obvious(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
-                     {1:1.0})
-
-  def test_simple(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
-                     {2:2.0})
-
-  def test_even_split(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
-                     {1:0.5})
-
-  def test_even_split_bigger(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
-                     {2:1.0})
-
-  def test_uneven_split(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
-                     {1:0.5, 2:1.5})
-
-  def test_uneven_split_bigger(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
-                     {1:0.5, 3:2.5})
-
-  def test_uneven_split_jumble(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
-                     {1:0.2, 3:0.7, 6:1.7, 10:5.7})
-
-  def test_documentation_example(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
-                     {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
deleted file mode 100644 (file)
index 39d2d5b..0000000
+++ /dev/null
@@ -1,551 +0,0 @@
-/* Deals with getting Keep Server blocks from API Server and Keep Servers. */
-
-package keep
-
-import (
-       "bufio"
-       "encoding/json"
-       "errors"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "io"
-       "io/ioutil"
-       "log"
-       "net/http"
-       "strconv"
-       "strings"
-       "time"
-)
-
-// ServerAddress struct
-type ServerAddress struct {
-       SSL         bool   `json:"service_ssl_flag"`
-       Host        string `json:"service_host"`
-       Port        int    `json:"service_port"`
-       UUID        string `json:"uuid"`
-       ServiceType string `json:"service_type"`
-}
-
-// BlockInfo is info about a particular block returned by the server
-type BlockInfo struct {
-       Digest blockdigest.DigestWithSize
-       Mtime  int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// BlockServerInfo is info about a specified block given by a server
-type BlockServerInfo struct {
-       ServerIndex int
-       Mtime       int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// ServerContents struct
-type ServerContents struct {
-       BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
-}
-
-// ServerResponse struct
-type ServerResponse struct {
-       Address  ServerAddress
-       Contents ServerContents
-       Err      error
-}
-
-// ReadServers struct
-type ReadServers struct {
-       ReadAllServers           bool
-       KeepServerIndexToAddress []ServerAddress
-       KeepServerAddressToIndex map[ServerAddress]int
-       ServerToContents         map[ServerAddress]ServerContents
-       BlockToServers           map[blockdigest.DigestWithSize][]BlockServerInfo
-       BlockReplicationCounts   map[int]int
-}
-
-// GetKeepServersParams struct
-type GetKeepServersParams struct {
-       Client *arvadosclient.ArvadosClient
-       Logger *logger.Logger
-       Limit  int
-}
-
-// ServiceList consists of the addresses of all the available kee servers
-type ServiceList struct {
-       ItemsAvailable int             `json:"items_available"`
-       KeepServers    []ServerAddress `json:"items"`
-}
-
-var serviceType string
-
-func init() {
-       flag.StringVar(&serviceType,
-               "service-type",
-               "disk",
-               "Operate only on keep_services with the specified service_type, ignoring all others.")
-}
-
-// String
-// TODO(misha): Change this to include the UUID as well.
-func (s ServerAddress) String() string {
-       return s.URL()
-}
-
-// URL of the keep server
-func (s ServerAddress) URL() string {
-       if s.SSL {
-               return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
-       }
-       return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
-}
-
-// GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
-       results, err = GetKeepServers(params)
-       if err != nil {
-               return
-       }
-       log.Printf("Returned %d keep disks", len(results.ServerToContents))
-
-       results.Summarize(params.Logger)
-       log.Printf("Replication level distribution: %v",
-               results.BlockReplicationCounts)
-
-       return
-}
-
-// GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
-       sdkParams := arvadosclient.Dict{
-               "filters": [][]string{{"service_type", "!=", "proxy"}},
-       }
-       if params.Limit > 0 {
-               sdkParams["limit"] = params.Limit
-       }
-
-       var sdkResponse ServiceList
-       err = params.Client.List("keep_services", sdkParams, &sdkResponse)
-
-       if err != nil {
-               return
-       }
-
-       var keepServers []ServerAddress
-       for _, server := range sdkResponse.KeepServers {
-               if server.ServiceType == serviceType {
-                       keepServers = append(keepServers, server)
-               } else {
-                       log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType)
-               }
-       }
-
-       if len(keepServers) == 0 {
-               return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType)
-       }
-
-       if params.Logger != nil {
-               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
-                       keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
-                       keepInfo["keep_servers"] = sdkResponse.KeepServers
-                       keepInfo["indexable_keep_servers"] = keepServers
-               })
-       }
-
-       log.Printf("Received keep services list: %+v", sdkResponse)
-
-       if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
-               return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
-       }
-
-       results.KeepServerIndexToAddress = keepServers
-       results.KeepServerAddressToIndex = make(map[ServerAddress]int)
-       for i, address := range results.KeepServerIndexToAddress {
-               results.KeepServerAddressToIndex[address] = i
-       }
-
-       log.Printf("Got Server Addresses: %v", results)
-
-       // Send off all the index requests concurrently
-       responseChan := make(chan ServerResponse)
-       for _, keepServer := range results.KeepServerIndexToAddress {
-               // The above keepsServer variable is reused for each iteration, so
-               // it would be shared across all goroutines. This would result in
-               // us querying one server n times instead of n different servers
-               // as we intended. To avoid this we add it as an explicit
-               // parameter which gets copied. This bug and solution is described
-               // in https://golang.org/doc/effective_go.html#channels
-               go func(keepServer ServerAddress) {
-                       responseChan <- GetServerContents(params.Logger,
-                               keepServer,
-                               params.Client)
-               }(keepServer)
-       }
-
-       results.ServerToContents = make(map[ServerAddress]ServerContents)
-       results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo)
-
-       // Read all the responses
-       for i := range results.KeepServerIndexToAddress {
-               _ = i // Here to prevent go from complaining.
-               response := <-responseChan
-
-               // Check if there were any errors during GetServerContents
-               if response.Err != nil {
-                       return results, response.Err
-               }
-
-               log.Printf("Received channel response from %v containing %d files",
-                       response.Address,
-                       len(response.Contents.BlockDigestToInfo))
-               results.ServerToContents[response.Address] = response.Contents
-               serverIndex := results.KeepServerAddressToIndex[response.Address]
-               for _, blockInfo := range response.Contents.BlockDigestToInfo {
-                       results.BlockToServers[blockInfo.Digest] = append(
-                               results.BlockToServers[blockInfo.Digest],
-                               BlockServerInfo{ServerIndex: serverIndex,
-                                       Mtime: blockInfo.Mtime})
-               }
-       }
-       return
-}
-
-// GetServerContents of the keep server
-func GetServerContents(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) (response ServerResponse) {
-
-       err := GetServerStatus(arvLogger, keepServer, arv)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       req, err := CreateIndexRequest(arvLogger, keepServer, arv)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       resp, err := arv.Client.Do(req)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       response, err = ReadServerResponse(arvLogger, keepServer, resp)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       return
-}
-
-// GetServerStatus get keep server status by invoking /status.json
-func GetServerStatus(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) error {
-       url := fmt.Sprintf("http://%s:%d/status.json",
-               keepServer.Host,
-               keepServer.Port)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := make(map[string]interface{})
-                       serverInfo["status_request_sent_at"] = now
-                       serverInfo["host"] = keepServer.Host
-                       serverInfo["port"] = keepServer.Port
-
-                       keepInfo[keepServer.UUID] = serverInfo
-               })
-       }
-
-       resp, err := arv.Client.Get(url)
-       if err != nil {
-               return fmt.Errorf("Error getting keep status from %s: %v", url, err)
-       } else if resp.StatusCode != 200 {
-               return fmt.Errorf("Received error code %d in response to request "+
-                       "for %s status: %s",
-                       resp.StatusCode, url, resp.Status)
-       }
-
-       var keepStatus map[string]interface{}
-       decoder := json.NewDecoder(resp.Body)
-       decoder.UseNumber()
-       err = decoder.Decode(&keepStatus)
-       if err != nil {
-               return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
-       }
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["status_response_processed_at"] = now
-                       serverInfo["status"] = keepStatus
-               })
-       }
-
-       return nil
-}
-
-// CreateIndexRequest to the keep server
-func CreateIndexRequest(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) (req *http.Request, err error) {
-       url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
-       log.Println("About to fetch keep server contents from " + url)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["index_request_sent_at"] = now
-               })
-       }
-
-       req, err = http.NewRequest("GET", url, nil)
-       if err != nil {
-               return req, fmt.Errorf("Error building http request for %s: %v", url, err)
-       }
-
-       req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
-       return req, err
-}
-
-// ReadServerResponse reads reasponse from keep server
-func ReadServerResponse(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       resp *http.Response) (response ServerResponse, err error) {
-
-       if resp.StatusCode != 200 {
-               return response, fmt.Errorf("Received error code %d in response to index request for %s: %s",
-                       resp.StatusCode, keepServer.String(), resp.Status)
-       }
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["index_response_received_at"] = now
-               })
-       }
-
-       response.Address = keepServer
-       response.Contents.BlockDigestToInfo =
-               make(map[blockdigest.DigestWithSize]BlockInfo)
-       reader := bufio.NewReader(resp.Body)
-       numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
-       for {
-               numLines++
-               line, err := reader.ReadString('\n')
-               if err == io.EOF {
-                       return response, fmt.Errorf("Index from %s truncated at line %d",
-                               keepServer.String(), numLines)
-               } else if err != nil {
-                       return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
-                               keepServer.String(), numLines, err)
-               }
-               if line == "\n" {
-                       if _, err := reader.Peek(1); err == nil {
-                               extra, _ := reader.ReadString('\n')
-                               return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
-                                       keepServer.String(), numLines+1, extra)
-                       } else if err != io.EOF {
-                               return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
-                                       keepServer.String(), numLines, err)
-                       }
-                       numLines--
-                       break
-               }
-               blockInfo, err := parseBlockInfoFromIndexLine(line)
-               if err != nil {
-                       return response, fmt.Errorf("Error parsing BlockInfo from index line "+
-                               "received from %s: %v",
-                               keepServer.String(),
-                               err)
-               }
-
-               if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
-                       // This server returned multiple lines containing the same block digest.
-                       numDuplicates++
-                       // Keep the block that's newer.
-                       if storedBlock.Mtime < blockInfo.Mtime {
-                               response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
-                       }
-               } else {
-                       response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
-               }
-       }
-
-       log.Printf("%s index contained %d lines with %d duplicates with "+
-               "%d size disagreements",
-               keepServer.String(),
-               numLines,
-               numDuplicates,
-               numSizeDisagreements)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-
-                       serverInfo["processing_finished_at"] = now
-                       serverInfo["lines_received"] = numLines
-                       serverInfo["duplicates_seen"] = numDuplicates
-                       serverInfo["size_disagreements_seen"] = numSizeDisagreements
-               })
-       }
-       resp.Body.Close()
-       return
-}
-
-func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
-       tokens := strings.Fields(indexLine)
-       if len(tokens) != 2 {
-               err = fmt.Errorf("Expected 2 tokens per line but received a "+
-                       "line containing %#q instead.",
-                       tokens)
-       }
-
-       var locator blockdigest.BlockLocator
-       if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil {
-               err = fmt.Errorf("%v Received error while parsing line \"%#q\"",
-                       err, indexLine)
-               return
-       }
-       if len(locator.Hints) > 0 {
-               err = fmt.Errorf("Block locator in index line should not contain hints "+
-                       "but it does: %#q",
-                       locator)
-               return
-       }
-
-       var ns int64
-       ns, err = strconv.ParseInt(tokens[1], 10, 64)
-       if err != nil {
-               return
-       }
-       if ns < 1e12 {
-               // An old version of keepstore is giving us timestamps
-               // in seconds instead of nanoseconds. (This threshold
-               // correctly handles all times between 1970-01-02 and
-               // 33658-09-27.)
-               ns = ns * 1e9
-       }
-       blockInfo.Mtime = ns
-       blockInfo.Digest = blockdigest.DigestWithSize{
-               Digest: locator.Digest,
-               Size:   uint32(locator.Size),
-       }
-       return
-}
-
-// Summarize results from keep server
-func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
-       readServers.BlockReplicationCounts = make(map[int]int)
-       for _, infos := range readServers.BlockToServers {
-               replication := len(infos)
-               readServers.BlockReplicationCounts[replication]++
-       }
-
-       if arvLogger != nil {
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
-               })
-       }
-}
-
-// TrashRequest struct
-type TrashRequest struct {
-       Locator    string `json:"locator"`
-       BlockMtime int64  `json:"block_mtime"`
-}
-
-// TrashList is an array of TrashRequest objects
-type TrashList []TrashRequest
-
-// SendTrashLists to trash queue
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) {
-       count := 0
-       barrier := make(chan error)
-
-       client := kc.Client
-
-       for url, v := range spl {
-               if arvLogger != nil {
-                       // We need a local variable because Update doesn't call our mutator func until later,
-                       // when our list variable might have been reused by the next loop iteration.
-                       url := url
-                       trashLen := len(v)
-                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               trashListInfo := logger.GetOrCreateMap(p, "trash_list_len")
-                               trashListInfo[url] = trashLen
-                       })
-               }
-
-               if dryRun {
-                       log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v))
-                       continue
-               }
-
-               count++
-               log.Printf("Sending trash list to %v", url)
-
-               go (func(url string, v TrashList) {
-                       pipeReader, pipeWriter := io.Pipe()
-                       go (func() {
-                               enc := json.NewEncoder(pipeWriter)
-                               enc.Encode(v)
-                               pipeWriter.Close()
-                       })()
-
-                       req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
-                       if err != nil {
-                               log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
-                               barrier <- err
-                               return
-                       }
-
-                       req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
-
-                       // Make the request
-                       var resp *http.Response
-                       if resp, err = client.Do(req); err != nil {
-                               log.Printf("Error sending trash list to %v error: %v", url, err.Error())
-                               barrier <- err
-                               return
-                       }
-
-                       log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
-
-                       io.Copy(ioutil.Discard, resp.Body)
-                       resp.Body.Close()
-
-                       if resp.StatusCode != 200 {
-                               barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
-                       } else {
-                               barrier <- nil
-                       }
-               })(url, v)
-       }
-
-       for i := 0; i < count; i++ {
-               b := <-barrier
-               if b != nil {
-                       errs = append(errs, b)
-               }
-       }
-
-       return errs
-}
diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go
deleted file mode 100644 (file)
index ca8797e..0000000
+++ /dev/null
@@ -1,278 +0,0 @@
-package keep
-
-import (
-       "encoding/json"
-       "fmt"
-       "net"
-       "net/http"
-       "net/http/httptest"
-       "net/url"
-       "strconv"
-       "strings"
-       "testing"
-
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-
-       . "gopkg.in/check.v1"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
-       TestingT(t)
-}
-
-type KeepSuite struct{}
-
-var _ = Suite(&KeepSuite{})
-
-type TestHandler struct {
-       request TrashList
-}
-
-func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
-       r := json.NewDecoder(req.Body)
-       r.Decode(&ts.request)
-}
-
-func (s *KeepSuite) TestSendTrashLists(c *C) {
-       th := TestHandler{}
-       server := httptest.NewServer(&th)
-       defer server.Close()
-
-       tl := map[string]TrashList{
-               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
-       arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
-               map[string]string{"xxxx": server.URL},
-               map[string]string{})
-
-       err := SendTrashLists(nil, &kc, tl, false)
-
-       c.Check(err, IsNil)
-
-       c.Check(th.request,
-               DeepEquals,
-               tl[server.URL])
-
-}
-
-type TestHandlerError struct {
-}
-
-func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
-       http.Error(writer, "I'm a teapot", 418)
-}
-
-func sendTrashListError(c *C, server *httptest.Server) {
-       tl := map[string]TrashList{
-               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
-       arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
-               map[string]string{"xxxx": server.URL},
-               map[string]string{})
-
-       err := SendTrashLists(nil, &kc, tl, false)
-
-       c.Check(err, NotNil)
-       c.Check(err[0], NotNil)
-}
-
-func (s *KeepSuite) TestSendTrashListErrorResponse(c *C) {
-       server := httptest.NewServer(&TestHandlerError{})
-       sendTrashListError(c, server)
-       defer server.Close()
-}
-
-func (s *KeepSuite) TestSendTrashListUnreachable(c *C) {
-       sendTrashListError(c, httptest.NewUnstartedServer(&TestHandler{}))
-}
-
-type APITestData struct {
-       numServers int
-       serverType string
-       statusCode int
-}
-
-func (s *KeepSuite) TestGetKeepServers_UnsupportedServiceType(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{1, "notadisk", 200}, "Found no keepservices with the service type disk")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReceivedTooFewServers(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{2, "disk", 200}, "Did not receive all available keep servers")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ServerError(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{-1, "disk", -1}, "arvados API server error")
-}
-
-func testGetKeepServersFromAPI(c *C, testData APITestData, expectedError string) {
-       keepServers := ServiceList{
-               ItemsAvailable: testData.numServers,
-               KeepServers: []ServerAddress{{
-                       SSL:         false,
-                       Host:        "example.com",
-                       Port:        12345,
-                       UUID:        "abcdefg",
-                       ServiceType: testData.serverType,
-               }},
-       }
-
-       ksJSON, _ := json.Marshal(keepServers)
-       apiStubResponses := make(map[string]arvadostest.StubResponse)
-       apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{testData.statusCode, string(ksJSON)}
-       apiStub := arvadostest.ServerStub{apiStubResponses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": "http://example.com:23456"},
-               map[string]string{"xxxx": "http://example.com:23456"},
-               map[string]string{})
-
-       params := GetKeepServersParams{
-               Client: arv,
-               Logger: nil,
-               Limit:  10,
-       }
-
-       _, err := GetKeepServersAndSummarize(params)
-       c.Assert(err, NotNil)
-       c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", expectedError))
-}
-
-type KeepServerTestData struct {
-       // handle /status.json
-       statusStatusCode int
-
-       // handle /index
-       indexStatusCode   int
-       indexResponseBody string
-
-       // expected error, if any
-       expectedError string
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorGettingKeepServerStatus(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{500, 200, "ok",
-               ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_GettingIndex(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, -1, "notok",
-               ".*redirect-loop.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorReadServerResponse(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 500, "notok",
-               ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseTuncatedAtLineOne(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
-               "notterminatedwithnewline", "Index from http://.* truncated at line 1"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_InvalidBlockLocatorPattern(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "testing\n",
-               "Error parsing BlockInfo from index line.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseEmpty(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "\n", ""})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseWithTwoBlocks(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
-               "51752ba076e461ec9ec1d27400a08548+20 1447526361\na048cc05c02ba1ee43ad071274b9e547+52 1447526362\n\n", ""})
-}
-
-func testGetKeepServersAndSummarize(c *C, testData KeepServerTestData) {
-       ksStubResponses := make(map[string]arvadostest.StubResponse)
-       ksStubResponses["/status.json"] = arvadostest.StubResponse{testData.statusStatusCode, string(`{}`)}
-       ksStubResponses["/index"] = arvadostest.StubResponse{testData.indexStatusCode, testData.indexResponseBody}
-       ksStub := arvadostest.ServerStub{ksStubResponses}
-       ks := httptest.NewServer(&ksStub)
-       defer ks.Close()
-
-       ksURL, err := url.Parse(ks.URL)
-       c.Check(err, IsNil)
-       ksHost, port, err := net.SplitHostPort(ksURL.Host)
-       ksPort, err := strconv.Atoi(port)
-       c.Check(err, IsNil)
-
-       servers_list := ServiceList{
-               ItemsAvailable: 1,
-               KeepServers: []ServerAddress{{
-                       SSL:         false,
-                       Host:        ksHost,
-                       Port:        ksPort,
-                       UUID:        "abcdefg",
-                       ServiceType: "disk",
-               }},
-       }
-       ksJSON, _ := json.Marshal(servers_list)
-       apiStubResponses := make(map[string]arvadostest.StubResponse)
-       apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{200, string(ksJSON)}
-       apiStub := arvadostest.ServerStub{apiStubResponses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": ks.URL},
-               map[string]string{"xxxx": ks.URL},
-               map[string]string{})
-
-       params := GetKeepServersParams{
-               Client: arv,
-               Logger: nil,
-               Limit:  10,
-       }
-
-       // GetKeepServersAndSummarize
-       results, err := GetKeepServersAndSummarize(params)
-
-       if testData.expectedError == "" {
-               c.Assert(err, IsNil)
-               c.Assert(results, NotNil)
-
-               blockToServers := results.BlockToServers
-
-               blockLocators := strings.Split(testData.indexResponseBody, "\n")
-               for _, loc := range blockLocators {
-                       locator := strings.Split(loc, " ")[0]
-                       if locator != "" {
-                               blockLocator, err := blockdigest.ParseBlockLocator(locator)
-                               c.Assert(err, IsNil)
-
-                               blockDigestWithSize := blockdigest.DigestWithSize{blockLocator.Digest, uint32(blockLocator.Size)}
-                               blockServerInfo := blockToServers[blockDigestWithSize]
-                               c.Assert(blockServerInfo[0].Mtime, NotNil)
-                       }
-               }
-       } else {
-               c.Assert(err, ErrorMatches, testData.expectedError)
-       }
-}
diff --git a/services/datamanager/loggerutil/loggerutil.go b/services/datamanager/loggerutil/loggerutil.go
deleted file mode 100644 (file)
index 8111425..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/* Datamanager-specific logging methods. */
-
-package loggerutil
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "log"
-       "os"
-       "runtime"
-       "time"
-)
-
-// Useful to call at the beginning of execution to log info about the
-// current run.
-func LogRunInfo(arvLogger *logger.Logger) {
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       runInfo := logger.GetOrCreateMap(p, "run_info")
-                       runInfo["started_at"] = now
-                       runInfo["args"] = os.Args
-                       hostname, err := os.Hostname()
-                       if err != nil {
-                               runInfo["hostname_error"] = err.Error()
-                       } else {
-                               runInfo["hostname"] = hostname
-                       }
-                       runInfo["pid"] = os.Getpid()
-               })
-       }
-}
-
-// A LogMutator that records the current memory usage. This is most useful as a logger write hook.
-func LogMemoryAlloc(p map[string]interface{}, e map[string]interface{}) {
-       runInfo := logger.GetOrCreateMap(p, "run_info")
-       var memStats runtime.MemStats
-       runtime.ReadMemStats(&memStats)
-       runInfo["memory_bytes_in_use"] = memStats.Alloc
-       runInfo["memory_bytes_reserved"] = memStats.Sys
-}
-
-func FatalWithMessage(arvLogger *logger.Logger, message string) {
-       if arvLogger != nil {
-               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       p["FATAL"] = message
-                       runInfo := logger.GetOrCreateMap(p, "run_info")
-                       runInfo["finished_at"] = time.Now()
-               })
-       }
-
-       log.Fatalf(message)
-}
diff --git a/services/datamanager/summary/canonical_string.go b/services/datamanager/summary/canonical_string.go
deleted file mode 100644 (file)
index 152314c..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/* Ensures that we only have one copy of each unique string. This is
-/* not designed for concurrent access. */
-
-package summary
-
-// This code should probably be moved somewhere more universal.
-
-// CanonicalString struct
-type CanonicalString struct {
-       m map[string]string
-}
-
-// Get a CanonicalString
-func (cs *CanonicalString) Get(s string) (r string) {
-       if cs.m == nil {
-               cs.m = make(map[string]string)
-       }
-       value, found := cs.m[s]
-       if found {
-               return value
-       }
-
-       // s may be a substring of a much larger string.
-       // If we store s, it will prevent that larger string from getting
-       // garbage collected.
-       // If this is something you worry about you should change this code
-       // to make an explict copy of s using a byte array.
-       cs.m[s] = s
-       return s
-}
diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go
deleted file mode 100644 (file)
index 6e463d7..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-// Handles writing data to and reading data from disk to speed up development.
-
-package summary
-
-import (
-       "encoding/gob"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "log"
-       "os"
-)
-
-// Used to locally cache data read from servers to reduce execution
-// time when developing. Not for use in production.
-type serializedData struct {
-       ReadCollections collection.ReadCollections
-       KeepServerInfo  keep.ReadServers
-}
-
-var (
-       WriteDataTo  string
-       readDataFrom string
-)
-
-// DataFetcher to fetch data from keep servers
-type DataFetcher func(arvLogger *logger.Logger,
-       readCollections *collection.ReadCollections,
-       keepServerInfo *keep.ReadServers) error
-
-func init() {
-       flag.StringVar(&WriteDataTo,
-               "write-data-to",
-               "",
-               "Write summary of data received to this file. Used for development only.")
-       flag.StringVar(&readDataFrom,
-               "read-data-from",
-               "",
-               "Avoid network i/o and read summary data from this file instead. Used for development only.")
-}
-
-// MaybeWriteData writes data we've read to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeWriteData(arvLogger *logger.Logger,
-       readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) error {
-       if WriteDataTo == "" {
-               return nil
-       }
-       summaryFile, err := os.Create(WriteDataTo)
-       if err != nil {
-               return err
-       }
-       defer summaryFile.Close()
-
-       enc := gob.NewEncoder(summaryFile)
-       data := serializedData{
-               ReadCollections: readCollections,
-               KeepServerInfo:  keepServerInfo}
-       err = enc.Encode(data)
-       if err != nil {
-               return err
-       }
-       log.Printf("Wrote summary data to: %s", WriteDataTo)
-       return nil
-}
-
-// ShouldReadData should not be used outside of development
-func ShouldReadData() bool {
-       return readDataFrom != ""
-}
-
-// ReadData reads data that we've written to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func ReadData(arvLogger *logger.Logger,
-       readCollections *collection.ReadCollections,
-       keepServerInfo *keep.ReadServers) error {
-       if readDataFrom == "" {
-               return fmt.Errorf("ReadData() called with empty filename.")
-       }
-       summaryFile, err := os.Open(readDataFrom)
-       if err != nil {
-               return err
-       }
-       defer summaryFile.Close()
-
-       dec := gob.NewDecoder(summaryFile)
-       data := serializedData{}
-       err = dec.Decode(&data)
-       if err != nil {
-               return err
-       }
-
-       // re-summarize data, so that we can update our summarizing
-       // functions without needing to do all our network i/o
-       data.ReadCollections.Summarize(arvLogger)
-       data.KeepServerInfo.Summarize(arvLogger)
-
-       *readCollections = data.ReadCollections
-       *keepServerInfo = data.KeepServerInfo
-       log.Printf("Read summary data from: %s", readDataFrom)
-       return nil
-}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
deleted file mode 100644 (file)
index d7fb3eb..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
-
-package summary
-
-import (
-       "encoding/json"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "log"
-       "os"
-       "strings"
-)
-
-// Locator is a block digest
-type Locator blockdigest.DigestWithSize
-
-// MarshalJSON encoding
-func (l Locator) MarshalJSON() ([]byte, error) {
-       return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// PullRequest represents one entry in the Pull List
-type PullRequest struct {
-       Locator Locator  `json:"locator"`
-       Servers []string `json:"servers"`
-}
-
-// PullList for a particular server
-type PullList []PullRequest
-
-// PullListByLocator implements sort.Interface for PullList based on
-// the Digest.
-type PullListByLocator PullList
-
-func (a PullListByLocator) Len() int      { return len(a) }
-func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a PullListByLocator) Less(i, j int) bool {
-       di, dj := a[i].Locator.Digest, a[j].Locator.Digest
-       if di.H < dj.H {
-               return true
-       } else if di.H == dj.H {
-               if di.L < dj.L {
-                       return true
-               } else if di.L == dj.L {
-                       return a[i].Locator.Size < a[j].Locator.Size
-               }
-       }
-       return false
-}
-
-// PullServers struct
-// For a given under-replicated block, this structure represents which
-// servers should pull the specified block and which servers they can
-// pull it from.
-type PullServers struct {
-       To   []string // Servers that should pull the specified block
-       From []string // Servers that already contain the specified block
-}
-
-// ComputePullServers creates a map from block locator to PullServers
-// with one entry for each under-replicated block.
-//
-// This method ignores zero-replica blocks since there are no servers
-// to pull them from, so callers should feel free to omit them, but
-// this function will ignore them if they are provided.
-func ComputePullServers(kc *keepclient.KeepClient,
-       keepServerInfo *keep.ReadServers,
-       blockToDesiredReplication map[blockdigest.DigestWithSize]int,
-       underReplicated BlockSet) (m map[Locator]PullServers) {
-       m = map[Locator]PullServers{}
-       // We use CanonicalString to avoid filling memory with duplicate
-       // copies of the same string.
-       var cs CanonicalString
-
-       // Servers that are writeable
-       writableServers := map[string]struct{}{}
-       for _, url := range kc.WritableLocalRoots() {
-               writableServers[cs.Get(url)] = struct{}{}
-       }
-
-       for block := range underReplicated {
-               serversStoringBlock := keepServerInfo.BlockToServers[block]
-               numCopies := len(serversStoringBlock)
-               numCopiesMissing := blockToDesiredReplication[block] - numCopies
-               if numCopiesMissing > 0 {
-                       // We expect this to always be true, since the block was listed
-                       // in underReplicated.
-
-                       if numCopies > 0 {
-                               // Not much we can do with blocks with no copies.
-
-                               // A server's host-port string appears as a key in this map
-                               // iff it contains the block.
-                               serverHasBlock := map[string]struct{}{}
-                               for _, info := range serversStoringBlock {
-                                       sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
-                                       serverHasBlock[cs.Get(sa.URL())] = struct{}{}
-                               }
-
-                               roots := keepclient.NewRootSorter(kc.LocalRoots(),
-                                       block.String()).GetSortedRoots()
-
-                               l := Locator(block)
-                               m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
-                                       roots, numCopiesMissing)
-                       }
-               }
-       }
-       return m
-}
-
-// CreatePullServers creates a pull list in which the To and From
-// fields preserve the ordering of sorted servers and the contents
-// are all canonical strings.
-func CreatePullServers(cs CanonicalString,
-       serverHasBlock map[string]struct{},
-       writableServers map[string]struct{},
-       sortedServers []string,
-       maxToFields int) (ps PullServers) {
-
-       ps = PullServers{
-               To:   make([]string, 0, maxToFields),
-               From: make([]string, 0, len(serverHasBlock)),
-       }
-
-       for _, host := range sortedServers {
-               // Strip the protocol portion of the url.
-               // Use the canonical copy of the string to avoid memory waste.
-               server := cs.Get(host)
-               _, hasBlock := serverHasBlock[server]
-               if hasBlock {
-                       // The from field should include the protocol.
-                       ps.From = append(ps.From, cs.Get(host))
-               } else if len(ps.To) < maxToFields {
-                       _, writable := writableServers[host]
-                       if writable {
-                               ps.To = append(ps.To, server)
-                       }
-               }
-       }
-
-       return
-}
-
-// RemoveProtocolPrefix strips the protocol prefix from a url.
-func RemoveProtocolPrefix(url string) string {
-       return url[(strings.LastIndex(url, "/") + 1):]
-}
-
-// BuildPullLists produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
-       spl = map[string]PullList{}
-       // We don't worry about canonicalizing our strings here, because we
-       // assume lps was created by ComputePullServers() which already
-       // canonicalized the strings for us.
-       for locator, pullServers := range lps {
-               for _, destination := range pullServers.To {
-                       pullList, pullListExists := spl[destination]
-                       if !pullListExists {
-                               pullList = PullList{}
-                       }
-                       spl[destination] = append(pullList,
-                               PullRequest{Locator: locator, Servers: pullServers.From})
-               }
-       }
-       return
-}
-
-// WritePullLists writes each pull list to a file.
-// The filename is based on the hostname.
-//
-// This is just a hack for prototyping, it is not expected to be used
-// in production.
-func WritePullLists(arvLogger *logger.Logger,
-       pullLists map[string]PullList,
-       dryRun bool) error {
-       r := strings.NewReplacer(":", ".")
-
-       for host, list := range pullLists {
-               if arvLogger != nil {
-                       // We need a local variable because Update doesn't call our mutator func until later,
-                       // when our list variable might have been reused by the next loop iteration.
-                       host := host
-                       listLen := len(list)
-                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
-                               pullListInfo[host] = listLen
-                       })
-               }
-
-               if dryRun {
-                       log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
-                       continue
-               }
-
-               filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
-               pullListFile, err := os.Create(filename)
-               if err != nil {
-                       return err
-               }
-               defer pullListFile.Close()
-
-               enc := json.NewEncoder(pullListFile)
-               err = enc.Encode(list)
-               if err != nil {
-                       return err
-               }
-               log.Printf("Wrote pull list to %s.", filename)
-       }
-
-       return nil
-}
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
deleted file mode 100644 (file)
index 60b495c..0000000
+++ /dev/null
@@ -1,272 +0,0 @@
-package summary
-
-import (
-       "encoding/json"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       . "gopkg.in/check.v1"
-       "sort"
-       "testing"
-)
-
-// Gocheck boilerplate
-func TestPullLists(t *testing.T) {
-       TestingT(t)
-}
-
-type PullSuite struct{}
-
-var _ = Suite(&PullSuite{})
-
-// Helper method to declare string sets more succinctly
-// Could be placed somewhere more general.
-func stringSet(slice ...string) (m map[string]struct{}) {
-       m = map[string]struct{}{}
-       for _, s := range slice {
-               m[s] = struct{}{}
-       }
-       return
-}
-
-func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
-       pl := PullList{PullRequest{
-               Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
-               Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
-                       "keep1.qr1hi.arvadosapi.com:25108"}}}
-
-       b, err := json.Marshal(pl)
-       c.Assert(err, IsNil)
-       expectedOutput := `[{"locator":"0000000000000000000000000badbeef+56789",` +
-               `"servers":["keep0.qr1hi.arvadosapi.com:25107",` +
-               `"keep1.qr1hi.arvadosapi.com:25108"]}]`
-       c.Check(string(b), Equals, expectedOutput)
-}
-
-func (s *PullSuite) TestCreatePullServers(c *C) {
-       var cs CanonicalString
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet(),
-                       stringSet(),
-                       []string{},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet(),
-                       []string{},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep0:25107"),
-                       []string{"https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{"https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110", "https://keep2:25109"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       1),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109",
-                               "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109",
-                               "https://keep1:25108", "https://keep0:25107"},
-                       1),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       0),
-               DeepEquals,
-               PullServers{To: []string{},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-}
-
-// Checks whether two pull list maps are equal. Since pull lists are
-// ordered arbitrarily, we need to sort them by digest before
-// comparing them for deep equality.
-type pullListMapEqualsChecker struct {
-       *CheckerInfo
-}
-
-func (c *pullListMapEqualsChecker) Check(params []interface{}, names []string) (result bool, error string) {
-       obtained, ok := params[0].(map[string]PullList)
-       if !ok {
-               return false, "First parameter is not a PullList map"
-       }
-       expected, ok := params[1].(map[string]PullList)
-       if !ok {
-               return false, "Second parameter is not a PullList map"
-       }
-
-       for _, v := range obtained {
-               sort.Sort(PullListByLocator(v))
-       }
-       for _, v := range expected {
-               sort.Sort(PullListByLocator(v))
-       }
-
-       return DeepEquals.Check(params, names)
-}
-
-var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
-       Name:   "PullListMapEquals",
-       Params: []string{"obtained", "expected"},
-}}
-
-func (s *PullSuite) TestBuildPullLists(c *C) {
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{}, From: []string{}}}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{}, From: []string{"f1", "f2"}}}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}}}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}}})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{}}}),
-               PullListMapEquals,
-               map[string]PullList{"t1": {
-                       PullRequest{locator1, []string{}}}})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1", "t2"},
-                               From: []string{"f1", "f2"},
-                       }}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": {PullRequest{locator1, []string{"f1", "f2"}}},
-               })
-
-       locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}},
-                       locator2: {To: []string{"t2"}, From: []string{"f3", "f4"}}}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": {PullRequest{locator2, []string{"f3", "f4"}}},
-               })
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1"},
-                               From: []string{"f1", "f2"}},
-                       locator2: {
-                               To:   []string{"t2", "t1"},
-                               From: []string{"f3", "f4"}},
-               }),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {
-                               PullRequest{locator1, []string{"f1", "f2"}},
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                       },
-                       "t2": {
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                       },
-               })
-
-       locator3 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xDeadBeef)}
-       locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1"},
-                               From: []string{"f1", "f2"}},
-                       locator2: {
-                               To:   []string{"t2", "t1"},
-                               From: []string{"f3", "f4"}},
-                       locator3: {
-                               To:   []string{"t3", "t2", "t1"},
-                               From: []string{"f4", "f5"}},
-                       locator4: {
-                               To:   []string{"t4", "t3", "t2", "t1"},
-                               From: []string{"f1", "f5"}},
-               }),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {
-                               PullRequest{locator1, []string{"f1", "f2"}},
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t2": {
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t3": {
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t4": {
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-               })
-}
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
deleted file mode 100644 (file)
index 9fb0316..0000000
+++ /dev/null
@@ -1,277 +0,0 @@
-// Summarizes Collection Data and Keep Server Contents.
-
-package summary
-
-// TODO(misha): Check size of blocks as well as their digest.
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "sort"
-)
-
-// BlockSet is a map of blocks
-type BlockSet map[blockdigest.DigestWithSize]struct{}
-
-// Insert adds a single block to the set.
-func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
-       bs[digest] = struct{}{}
-}
-
-// Union adds a set of blocks to the set.
-func (bs BlockSet) Union(obs BlockSet) {
-       for k, v := range obs {
-               bs[k] = v
-       }
-}
-
-// CollectionIndexSet is used to save space. To convert to and from
-// the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUUID and CollectionUUIDToIndex.
-type CollectionIndexSet map[int]struct{}
-
-// Insert adds a single collection to the set. The collection is specified by
-// its index.
-func (cis CollectionIndexSet) Insert(collectionIndex int) {
-       cis[collectionIndex] = struct{}{}
-}
-
-// ToCollectionIndexSet gets block to collection indices
-func (bs BlockSet) ToCollectionIndexSet(
-       readCollections collection.ReadCollections,
-       collectionIndexSet *CollectionIndexSet) {
-       for block := range bs {
-               for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
-                       collectionIndexSet.Insert(collectionIndex)
-               }
-       }
-}
-
-// ReplicationLevels struct
-// Keeps track of the requested and actual replication levels.
-// Currently this is only used for blocks but could easily be used for
-// collections as well.
-type ReplicationLevels struct {
-       // The requested replication level.
-       // For Blocks this is the maximum replication level among all the
-       // collections this block belongs to.
-       Requested int
-
-       // The actual number of keep servers this is on.
-       Actual int
-}
-
-// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
-type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
-
-// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
-// which only reports the number of blocks, not which blocks.
-type ReplicationLevelBlockCount struct {
-       Levels ReplicationLevels
-       Count  int
-}
-
-// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
-type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
-
-// ReplicationSummary sturct
-type ReplicationSummary struct {
-       CollectionBlocksNotInKeep  BlockSet
-       UnderReplicatedBlocks      BlockSet
-       OverReplicatedBlocks       BlockSet
-       CorrectlyReplicatedBlocks  BlockSet
-       KeepBlocksNotInCollections BlockSet
-
-       CollectionsNotFullyInKeep      CollectionIndexSet
-       UnderReplicatedCollections     CollectionIndexSet
-       OverReplicatedCollections      CollectionIndexSet
-       CorrectlyReplicatedCollections CollectionIndexSet
-}
-
-// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
-type ReplicationSummaryCounts struct {
-       CollectionBlocksNotInKeep      int
-       UnderReplicatedBlocks          int
-       OverReplicatedBlocks           int
-       CorrectlyReplicatedBlocks      int
-       KeepBlocksNotInCollections     int
-       CollectionsNotFullyInKeep      int
-       UnderReplicatedCollections     int
-       OverReplicatedCollections      int
-       CorrectlyReplicatedCollections int
-}
-
-// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
-// creating it if it doesn't already exist.
-func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
-       repLevels ReplicationLevels) (bs BlockSet) {
-       bs, exists := rlbs[repLevels]
-       if !exists {
-               bs = make(BlockSet)
-               rlbs[repLevels] = bs
-       }
-       return
-}
-
-// Insert adds a block to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Insert(
-       repLevels ReplicationLevels,
-       block blockdigest.DigestWithSize) {
-       rlbs.GetOrCreate(repLevels).Insert(block)
-}
-
-// Union adds a set of blocks to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Union(
-       repLevels ReplicationLevels,
-       bs BlockSet) {
-       rlbs.GetOrCreate(repLevels).Union(bs)
-}
-
-// Counts outputs a sorted list of ReplicationLevelBlockCounts.
-func (rlbs ReplicationLevelBlockSetMap) Counts() (
-       sorted ReplicationLevelBlockSetSlice) {
-       sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
-       i := 0
-       for levels, set := range rlbs {
-               sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
-               i++
-       }
-       sort.Sort(sorted)
-       return
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Len() int {
-       return len(rlbss)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
-       return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
-               (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
-                       rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
-       rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
-}
-
-// ComputeCounts returns ReplicationSummaryCounts
-func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
-       // TODO(misha): Consider rewriting this method to iterate through
-       // the fields using reflection, instead of explictily listing the
-       // fields as we do now.
-       rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
-       rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
-       rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
-       rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
-       rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
-       rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
-       rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
-       rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
-       rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
-       return rsc
-}
-
-// PrettyPrint ReplicationSummaryCounts
-func (rsc ReplicationSummaryCounts) PrettyPrint() string {
-       return fmt.Sprintf("Replication Block Counts:"+
-               "\n Missing From Keep: %d, "+
-               "\n Under Replicated: %d, "+
-               "\n Over Replicated: %d, "+
-               "\n Replicated Just Right: %d, "+
-               "\n Not In Any Collection: %d. "+
-               "\nReplication Collection Counts:"+
-               "\n Missing From Keep: %d, "+
-               "\n Under Replicated: %d, "+
-               "\n Over Replicated: %d, "+
-               "\n Replicated Just Right: %d.",
-               rsc.CollectionBlocksNotInKeep,
-               rsc.UnderReplicatedBlocks,
-               rsc.OverReplicatedBlocks,
-               rsc.CorrectlyReplicatedBlocks,
-               rsc.KeepBlocksNotInCollections,
-               rsc.CollectionsNotFullyInKeep,
-               rsc.UnderReplicatedCollections,
-               rsc.OverReplicatedCollections,
-               rsc.CorrectlyReplicatedCollections)
-}
-
-// BucketReplication returns ReplicationLevelBlockSetMap
-func BucketReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
-       rlbs = make(ReplicationLevelBlockSetMap)
-
-       for block, requestedReplication := range readCollections.BlockToDesiredReplication {
-               rlbs.Insert(
-                       ReplicationLevels{
-                               Requested: requestedReplication,
-                               Actual:    len(keepServerInfo.BlockToServers[block])},
-                       block)
-       }
-
-       for block, servers := range keepServerInfo.BlockToServers {
-               if 0 == readCollections.BlockToDesiredReplication[block] {
-                       rlbs.Insert(
-                               ReplicationLevels{Requested: 0, Actual: len(servers)},
-                               block)
-               }
-       }
-       return
-}
-
-// SummarizeBuckets reads collections and summarizes
-func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
-       readCollections collection.ReadCollections) (
-       rs ReplicationSummary) {
-       rs.CollectionBlocksNotInKeep = make(BlockSet)
-       rs.UnderReplicatedBlocks = make(BlockSet)
-       rs.OverReplicatedBlocks = make(BlockSet)
-       rs.CorrectlyReplicatedBlocks = make(BlockSet)
-       rs.KeepBlocksNotInCollections = make(BlockSet)
-
-       rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
-       rs.UnderReplicatedCollections = make(CollectionIndexSet)
-       rs.OverReplicatedCollections = make(CollectionIndexSet)
-       rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
-
-       for levels, bs := range rlbs {
-               if levels.Actual == 0 {
-                       rs.CollectionBlocksNotInKeep.Union(bs)
-               } else if levels.Requested == 0 {
-                       rs.KeepBlocksNotInCollections.Union(bs)
-               } else if levels.Actual < levels.Requested {
-                       rs.UnderReplicatedBlocks.Union(bs)
-               } else if levels.Actual > levels.Requested {
-                       rs.OverReplicatedBlocks.Union(bs)
-               } else {
-                       rs.CorrectlyReplicatedBlocks.Union(bs)
-               }
-       }
-
-       rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
-               &rs.CollectionsNotFullyInKeep)
-       // Since different collections can specify different replication
-       // levels, the fact that a block is under-replicated does not imply
-       // that all collections that it belongs to are under-replicated, but
-       // we'll ignore that for now.
-       // TODO(misha): Fix this and report the correct set of collections.
-       rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
-               &rs.UnderReplicatedCollections)
-       rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
-               &rs.OverReplicatedCollections)
-
-       for i := range readCollections.CollectionIndexToUUID {
-               if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
-               } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
-               } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
-               } else {
-                       rs.CorrectlyReplicatedCollections.Insert(i)
-               }
-       }
-
-       return
-}
diff --git a/services/datamanager/summary/summary_test.go b/services/datamanager/summary/summary_test.go
deleted file mode 100644 (file)
index 8268404..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-package summary
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "reflect"
-       "sort"
-       "testing"
-)
-
-func BlockSetFromSlice(digests []int) (bs BlockSet) {
-       bs = make(BlockSet)
-       for _, digest := range digests {
-               bs.Insert(blockdigest.MakeTestDigestWithSize(digest))
-       }
-       return
-}
-
-func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) {
-       cis = make(CollectionIndexSet)
-       for _, index := range indices {
-               cis.Insert(index)
-       }
-       return
-}
-
-func (cis CollectionIndexSet) ToSlice() (ints []int) {
-       ints = make([]int, len(cis))
-       i := 0
-       for collectionIndex := range cis {
-               ints[i] = collectionIndex
-               i++
-       }
-       sort.Ints(ints)
-       return
-}
-
-// Helper method to meet interface expected by older tests.
-func SummarizeReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
-       return BucketReplication(readCollections, keepServerInfo).
-               SummarizeBuckets(readCollections)
-}
-
-// Takes a map from block digest to replication level and represents
-// it in a keep.ReadServers structure.
-func SpecifyReplication(digestToReplication map[int]int) (rs keep.ReadServers) {
-       rs.BlockToServers = make(map[blockdigest.DigestWithSize][]keep.BlockServerInfo)
-       for digest, replication := range digestToReplication {
-               rs.BlockToServers[blockdigest.MakeTestDigestWithSize(digest)] =
-                       make([]keep.BlockServerInfo, replication)
-       }
-       return
-}
-
-// Verifies that
-// blocks.ToCollectionIndexSet(rc.BlockToCollectionIndices) returns
-// expectedCollections.
-func VerifyToCollectionIndexSet(
-       t *testing.T,
-       blocks []int,
-       blockToCollectionIndices map[int][]int,
-       expectedCollections []int) {
-
-       expected := CollectionIndexSetFromSlice(expectedCollections)
-
-       rc := collection.ReadCollections{
-               BlockToCollectionIndices: map[blockdigest.DigestWithSize][]int{},
-       }
-       for digest, indices := range blockToCollectionIndices {
-               rc.BlockToCollectionIndices[blockdigest.MakeTestDigestWithSize(digest)] = indices
-       }
-
-       returned := make(CollectionIndexSet)
-       BlockSetFromSlice(blocks).ToCollectionIndexSet(rc, &returned)
-
-       if !reflect.DeepEqual(returned, expected) {
-               t.Errorf("Expected %v.ToCollectionIndexSet(%v) to return \n %v \n but instead received \n %v",
-                       blocks,
-                       blockToCollectionIndices,
-                       expectedCollections,
-                       returned.ToSlice())
-       }
-}
-
-func TestToCollectionIndexSet(t *testing.T) {
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: {0}}, []int{0})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1}}, []int{1})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1, 9}}, []int{1, 9})
-       VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: {2, 3}, 6: {3, 4}},
-               []int{2, 3, 4})
-       VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: {8}, 6: {4}},
-               []int{4, 8})
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: {0}}, []int{})
-}
-
-func TestSimpleSummary(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSet{},
-               UnderReplicatedBlocks:      BlockSet{},
-               OverReplicatedBlocks:       BlockSet{},
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1, 2}),
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSet{},
-               UnderReplicatedCollections:     CollectionIndexSet{},
-               OverReplicatedCollections:      CollectionIndexSet{},
-               CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v", expectedSummary, returnedSummary)
-       }
-}
-
-func TestMissingBlock(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSetFromSlice([]int{2}),
-               UnderReplicatedBlocks:      BlockSet{},
-               OverReplicatedBlocks:       BlockSet{},
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1}),
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               UnderReplicatedCollections:     CollectionIndexSet{},
-               OverReplicatedCollections:      CollectionIndexSet{},
-               CorrectlyReplicatedCollections: CollectionIndexSet{},
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
-                       expectedSummary,
-                       returnedSummary)
-       }
-}
-
-func TestUnderAndOverReplicatedBlocks(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 2, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 3})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSet{},
-               UnderReplicatedBlocks:      BlockSetFromSlice([]int{1}),
-               OverReplicatedBlocks:       BlockSetFromSlice([]int{2}),
-               CorrectlyReplicatedBlocks:  BlockSet{},
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSet{},
-               UnderReplicatedCollections:     CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               OverReplicatedCollections:      CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               CorrectlyReplicatedCollections: CollectionIndexSet{},
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
-                       expectedSummary,
-                       returnedSummary)
-       }
-}
-
-func TestMixedReplication(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-               {ReplicationLevel: 1, Blocks: []int{3, 4}},
-               {ReplicationLevel: 2, Blocks: []int{5, 6}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1, 3: 1, 5: 1, 6: 3, 7: 2})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSetFromSlice([]int{4}),
-               UnderReplicatedBlocks:      BlockSetFromSlice([]int{5}),
-               OverReplicatedBlocks:       BlockSetFromSlice([]int{6}),
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1, 2, 3}),
-               KeepBlocksNotInCollections: BlockSetFromSlice([]int{7}),
-
-               CollectionsNotFullyInKeep:      CollectionIndexSetFromSlice([]int{cIndex[1]}),
-               UnderReplicatedCollections:     CollectionIndexSetFromSlice([]int{cIndex[2]}),
-               OverReplicatedCollections:      CollectionIndexSetFromSlice([]int{cIndex[2]}),
-               CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices)
-       }
-}
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
deleted file mode 100644 (file)
index 3e4d387..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-// Code for generating trash lists
-
-package summary
-
-import (
-       "errors"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "time"
-)
-
-// BuildTrashLists builds list of blocks to be sent to trash queue
-func BuildTrashLists(kc *keepclient.KeepClient,
-       keepServerInfo *keep.ReadServers,
-       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
-
-       // Servers that are writeable
-       writableServers := map[string]struct{}{}
-       for _, url := range kc.WritableLocalRoots() {
-               writableServers[url] = struct{}{}
-       }
-
-       _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
-       if err != nil {
-               return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
-       }
-
-       ttl := int64(_ttl.(float64))
-
-       // expire unreferenced blocks more than "ttl" seconds old.
-       expiry := time.Now().UTC().UnixNano() - ttl*1e9
-
-       return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
-}
-
-func buildTrashListsInternal(writableServers map[string]struct{},
-       keepServerInfo *keep.ReadServers,
-       expiry int64,
-       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
-
-       m = make(map[string]keep.TrashList)
-
-       for block := range keepBlocksNotInCollections {
-               for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
-                       if blockOnServer.Mtime >= expiry {
-                               continue
-                       }
-
-                       // block is older than expire cutoff
-                       srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
-
-                       if _, writable := writableServers[srv]; !writable {
-                               continue
-                       }
-
-                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
-               }
-       }
-       return
-
-}
diff --git a/services/datamanager/summary/trash_list_test.go b/services/datamanager/summary/trash_list_test.go
deleted file mode 100644 (file)
index 3626904..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-package summary
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       . "gopkg.in/check.v1"
-       "testing"
-)
-
-// Gocheck boilerplate
-func TestTrash(t *testing.T) {
-       TestingT(t)
-}
-
-type TrashSuite struct{}
-
-var _ = Suite(&TrashSuite{})
-
-func (s *TrashSuite) TestBuildTrashLists(c *C) {
-       var sv0 = keep.ServerAddress{Host: "keep0.example.com", Port: 80}
-       var sv1 = keep.ServerAddress{Host: "keep1.example.com", Port: 80}
-
-       var block0 = blockdigest.MakeTestDigestWithSize(0xdeadbeef)
-       var block1 = blockdigest.MakeTestDigestWithSize(0xfedbeef)
-
-       var keepServerInfo = keep.ReadServers{
-               KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
-               BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
-                       block0: {
-                               {0, 99},
-                               {1, 101}},
-                       block1: {
-                               {0, 99},
-                               {1, 101}}}}
-
-       // only block0 is in delete set
-       var bs = make(BlockSet)
-       bs[block0] = struct{}{}
-
-       // Test trash list where only sv0 is on writable list.
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {}},
-               &keepServerInfo,
-               110,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
-       // Test trash list where both sv0 and sv1 are on writable list.
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {},
-                       sv1.URL(): {}},
-               &keepServerInfo,
-               110,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
-                       "http://keep1.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
-
-       // Test trash list where only block on sv0 is expired
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {},
-                       sv1.URL(): {}},
-               &keepServerInfo,
-               100,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
-}