+func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, stderr *log.Logger) {
+ if ctx.Err() == nil {
+ var locators []string
+ for uuid := range kc.LocalRoots() {
+ reader, err := kc.GetIndex(uuid, "")
+ if err != nil {
+ stderr.Fatalf("Error getting index: %s\n", err)
+ }
+ scanner := bufio.NewScanner(reader)
+ for scanner.Scan() {
+ locators = append(locators, strings.Split(scanner.Text(), " ")[0])
+ }
+ }
+ stderr.Printf("Found %d locators\n", len(locators))
+ if len(locators) < 1 {
+ stderr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the useIndex cli argument.")
+ }
+
+ mathRand.Seed(time.Now().UnixNano())
+ mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] })
+
+ for _, locator := range locators {
+ // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml
+ signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey))
+ select {
+ case <-ctx.Done():
+ return
+ case indexLocatorChan <- signedLocator:
+ }
+ }
+ stderr.Fatal("Error: ran out of locators to read!")
+ }
+}
+
+func loadConfig(stderr *log.Logger) (cluster *arvados.Cluster) {
+ loader := config.NewLoader(os.Stdin, nil)
+ loader.SkipLegacy = true
+
+ cfg, err := loader.Load()
+ if err != nil {
+ stderr.Fatal(err)
+ }
+ cluster, err = cfg.GetCluster("")
+ if err != nil {
+ stderr.Fatal(err)
+ }
+ return
+}
+
+func doIndexReads(ctx context.Context, kc *keepclient.KeepClient, cluster *arvados.Cluster, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) {
+ for ctx.Err() == nil {
+ select {
+ case <-ctx.Done():
+ return
+ case locator := <-indexLocatorChan:
+ rdr, size, url, err := kc.Get(locator)
+ if err != nil {
+ stderr.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ n, err := io.Copy(ioutil.Discard, rdr)
+ rdr.Close()
+ if n != size || err != nil {
+ stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+ errorsChan <- struct{}{}
+ continue
+ // Note we don't count the bytes received in
+ // partial/corrupt responses: we are measuring
+ // throughput, not resource consumption.
+ }
+ bytesInChan <- uint64(n)
+ }
+ }
+}
+
+func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) {