// the block index from the Keepstore. We use the SystemRootToken from
// the Arvados config.yml for that.
var cluster *arvados.Cluster
- if *UseIndex {
+ if *ReadThreads > 0 && *UseIndex {
cluster = loadConfig(stderr)
kc.Arvados.ApiToken = cluster.SystemRootToken
}
csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,UseIndex,RunTime,Repeat"
var summary string
+ var nextBufs []chan []byte
+ for i := 0; i < *WriteThreads; i++ {
+ nextBuf := make(chan []byte, 1)
+ nextBufs = append(nextBufs, nextBuf)
+ go makeBufs(nextBuf, i, stderr)
+ }
+
for i := 0; i < *Repeat; i++ {
if ctx.Err() == nil {
- summary = runExperiment(ctx, cluster, kc, summary, csvHeader, stderr)
+ summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, stderr)
stderr.Printf("*************************** experiment %d complete ******************************\n", i)
summary += fmt.Sprintf(",%d\n", i)
}
}
}
-func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, summary string, csvHeader string, stderr *log.Logger) (newSummary string) {
+func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, stderr *log.Logger) (newSummary string) {
// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
var bytesInChan = make(chan uint64)
var bytesOutChan = make(chan uint64)
stderr.Printf("Start warmup phase, waiting for block index before reading starts\n")
}
}
- nextBuf := make(chan []byte, 1)
- go makeBufs(nextBuf, 0, stderr)
if warmup && !*UseIndex {
go func() {
- locator, _, err := kc.PutB(<-nextBuf)
+ locator, _, err := kc.PutB(<-nextBufs[0])
if err != nil {
stderr.Print(err)
errorsChan <- struct{}{}
stderr.Println("Warmup complete!")
close(ready)
}()
- } else if *UseIndex {
+ } else if warmup && *UseIndex {
// Get list of blocks to read
go getIndexLocators(ctx, cluster, kc, indexLocatorChan, stderr)
select {
defer cancel()
for i := 0; i < *WriteThreads; i++ {
- if i > 0 {
- // the makeBufs goroutine with index 0 was already started for the warmup phase, above
- nextBuf := make(chan []byte, 1)
- go makeBufs(nextBuf, i, stderr)
- }
- go doWrites(ctx, kc, nextBuf, &nextLocator, bytesOutChan, errorsChan, stderr)
+ go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, stderr)
}
if *UseIndex {
for i := 0; i < *ReadThreads; i++ {