+ var nextLocator atomic.Value
+ // when UseIndex is set, this channel is used instead of nextLocator
+ var indexLocatorChan = make(chan string, 2)
+
+ newSummary = summary
+
+ // Start warmup
+ ready := make(chan struct{})
+ var warmup bool
+ if *ReadThreads > 0 {
+ warmup = true
+ if !*UseIndex {
+ lgr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
+ } else {
+ lgr.Printf("Start warmup phase, waiting for block index before reading starts\n")
+ }
+ }
+ if warmup && !*UseIndex {
+ go func() {
+ locator, _, err := kc.PutB(<-nextBufs[0])
+ if err != nil {
+ lgr.Print(err)
+ errorsChan <- struct{}{}
+ }
+ nextLocator.Store(locator)
+ lgr.Println("Warmup complete!")
+ close(ready)
+ }()
+ } else if warmup && *UseIndex {
+ // Get list of blocks to read
+ go getIndexLocators(ctx, cluster, kc, indexLocatorChan, lgr)
+ select {
+ case <-ctx.Done():
+ return
+ case <-indexLocatorChan:
+ lgr.Println("Warmup complete!")
+ close(ready)
+ }
+ } else {
+ close(ready)
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-ready:
+ }
+
+ // Warmup complete
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*RunTime))
+ defer cancel()
+
+ for i := 0; i < *WriteThreads; i++ {
+ go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, lgr)
+ }
+ if *UseIndex {
+ for i := 0; i < *ReadThreads; i++ {
+ go doReads(ctx, kc, nil, indexLocatorChan, bytesInChan, errorsChan, lgr)
+ }
+ } else {
+ for i := 0; i < *ReadThreads; i++ {
+ go doReads(ctx, kc, &nextLocator, nil, bytesInChan, errorsChan, lgr)
+ }
+ }