X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e..d412382c1c8ad0303bed7584094a983c82fe3557:/tools/keep-exercise/keep-exercise.go diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 7641465aa3..1acd8d8b98 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -19,6 +19,8 @@ package main import ( + "bufio" + "context" "crypto/rand" "encoding/binary" "flag" @@ -26,14 +28,18 @@ import ( "io" "io/ioutil" "log" + mathRand "math/rand" "net/http" "os" "os/signal" - "sync" + "strings" "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" ) @@ -54,37 +60,17 @@ var ( getVersion = flag.Bool("version", false, "Print version information and exit.") RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)") Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)") + UseIndex = flag.Bool("use-index", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.") ) -var summary string -var csvHeader string - -func main() { - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keep-exercise %s\n", version) - os.Exit(0) - } - - stderr := log.New(os.Stderr, "", log.LstdFlags) - - if *ReadThreads > 0 && *WriteThreads == 0 { - stderr.Fatal("At least one write thread is required if rthreads is non-zero") - } - - if *ReadThreads == 0 && *WriteThreads == 0 { - stderr.Fatal("Nothing to do!") - } - +func createKeepClient(lgr *log.Logger) (kc *keepclient.KeepClient) { arv, err := arvadosclient.MakeArvadosClient() if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } - kc, err := keepclient.MakeKeepClient(arv) + kc, err = keepclient.MakeKeepClient(arv) if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } kc.Want_replicas = *Replicas @@ -99,92 +85,165 @@ func main() { TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), }, } + overrideServices(kc, lgr) + return kc +} + +func main() { + if ok, code := cmd.ParseFlags(flag.CommandLine, os.Args[0], os.Args[1:], "", os.Stderr); !ok { + os.Exit(code) + } else if *getVersion { + fmt.Printf("%s %s\n", os.Args[0], version) + return + } + + lgr := log.New(os.Stderr, "", log.LstdFlags) + + if *ReadThreads > 0 && *WriteThreads == 0 && !*UseIndex { + lgr.Fatal("At least one write thread is required if rthreads is non-zero and -use-index is not enabled") + } + + if *ReadThreads == 0 && *WriteThreads == 0 { + lgr.Fatal("Nothing to do!") + } + + kc := createKeepClient(lgr) + + // When UseIndex is set, we need a KeepClient with SystemRoot powers to get + // the block index from the Keepstore. We use the SystemRootToken from + // the Arvados config.yml for that. + var cluster *arvados.Cluster + if *ReadThreads > 0 && *UseIndex { + cluster = loadConfig(lgr) + kc.Arvados.ApiToken = cluster.SystemRootToken + } - overrideServices(kc, stderr) - csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-sigChan + // FIXME + //fmt.Print("\r") // Suppress the ^C print + cancel() + }() - for i := 0; i < *Repeat; i++ { - runExperiment(kc, stderr) - stderr.Printf("*************************** experiment %d complete ******************************\n", i) + csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,UseIndex,RunTime,Repeat" + var summary string + + var nextBufs []chan []byte + for i := 0; i < *WriteThreads; i++ { + nextBuf := make(chan []byte, 1) + nextBufs = append(nextBufs, nextBuf) + go makeBufs(nextBuf, i, lgr) + } + + for i := 0; i < *Repeat && ctx.Err() == nil; i++ { + summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, lgr) + lgr.Printf("*************************** experiment %d complete ******************************\n", i) summary += fmt.Sprintf(",%d\n", i) } - stderr.Println("Summary:") - stderr.Println() + + lgr.Println("Summary:") + lgr.Println() + fmt.Println() fmt.Println(csvHeader + ",Experiment") fmt.Println(summary) } -func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) { - var wg sync.WaitGroup +func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, lgr *log.Logger) (newSummary string) { + // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. + var bytesInChan = make(chan uint64) + var bytesOutChan = make(chan uint64) + // Send struct{}{} to errorsChan when an error happens. + var errorsChan = make(chan struct{}) + var nextLocator atomic.Value + // when UseIndex is set, this channel is used instead of nextLocator + var indexLocatorChan = make(chan string, 2) - wg.Add(1) - stopCh := make(chan struct{}) + newSummary = summary + + // Start warmup + ready := make(chan struct{}) + var warmup bool if *ReadThreads > 0 { - stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") - } - for i := 0; i < *WriteThreads; i++ { - nextBuf := make(chan []byte, 1) - wg.Add(1) - go makeBufs(&wg, nextBuf, i, stopCh, stderr) - wg.Add(1) - go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr) + warmup = true + if !*UseIndex { + lgr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + } else { + lgr.Printf("Start warmup phase, waiting for block index before reading starts\n") + } } - if *ReadThreads > 0 { - for nextLocator.Load() == nil { - select { - case _ = <-bytesOutChan: + if warmup && !*UseIndex { + go func() { + locator, _, err := kc.PutB(<-nextBufs[0]) + if err != nil { + lgr.Print(err) + errorsChan <- struct{}{} } + nextLocator.Store(locator) + lgr.Println("Warmup complete!") + close(ready) + }() + } else if warmup && *UseIndex { + // Get list of blocks to read + go getIndexLocators(ctx, cluster, kc, indexLocatorChan, lgr) + select { + case <-ctx.Done(): + return + case <-indexLocatorChan: + lgr.Println("Warmup complete!") + close(ready) } - stderr.Printf("Warmup complete") + } else { + close(ready) } - go countBeans(&wg, stopCh, stderr) - for i := 0; i < *ReadThreads; i++ { - wg.Add(1) - go doReads(&wg, kc, &nextLocator, stopCh, stderr) + select { + case <-ctx.Done(): + return + case <-ready: } - wg.Wait() -} -// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. -var bytesInChan = make(chan uint64) -var bytesOutChan = make(chan uint64) + // Warmup complete + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*RunTime)) + defer cancel() -// Send struct{}{} to errorsChan when an error happens. -var errorsChan = make(chan struct{}) + for i := 0; i < *WriteThreads; i++ { + go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, lgr) + } + if *UseIndex { + for i := 0; i < *ReadThreads; i++ { + go doReads(ctx, kc, nil, indexLocatorChan, bytesInChan, errorsChan, lgr) + } + } else { + for i := 0; i < *ReadThreads; i++ { + go doReads(ctx, kc, &nextLocator, nil, bytesInChan, errorsChan, lgr) + } + } -func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) { - defer wg.Done() t0 := time.Now() var tickChan <-chan time.Time - var endChan <-chan time.Time - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) if *StatsInterval > 0 { tickChan = time.NewTicker(*StatsInterval).C } - if *RunTime > 0 { - endChan = time.NewTicker(*RunTime).C - } var bytesIn uint64 var bytesOut uint64 var errors uint64 var rateIn, rateOut float64 var maxRateIn, maxRateOut float64 - var exit, abort, printCsv bool + var exit, printCsv bool csv := log.New(os.Stdout, "", 0) + csv.Println() csv.Println(csvHeader) for { select { - case <-tickChan: - printCsv = true - case <-endChan: + case <-ctx.Done(): printCsv = true exit = true - case <-c: + case <-tickChan: printCsv = true - abort = true - fmt.Print("\r") // Suppress the ^C print case i := <-bytesInChan: bytesIn += i case o := <-bytesOutChan: @@ -202,8 +261,8 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) { if rateOut > maxRateOut { maxRateOut = rateOut } - line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d", - time.Now().Format("2006-01-02 15:04:05"), + line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%t,%s,%d", + time.Now().Format("2006/01/02 15:04:05"), elapsed, bytesIn, rateIn, maxRateIn, bytesOut, rateOut, maxRateOut, @@ -217,27 +276,21 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) { *StatsInterval, *ServiceURL, *ServiceUUID, + *UseIndex, *RunTime, *Repeat, ) csv.Println(line) if exit { - summary += line + newSummary += line + return } printCsv = false } - if abort { - os.Exit(0) - } - if exit { - close(stopCh) - break - } } } -func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) { - defer wg.Done() +func makeBufs(nextBuf chan<- []byte, threadID int, lgr *log.Logger) { buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) @@ -250,74 +303,120 @@ func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <- if *VaryRequest { rnd := make([]byte, randSize) if _, err := io.ReadFull(rand.Reader, rnd); err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } buf = append(rnd, buf[randSize:]...) } - select { - case <-stopCh: - close(nextBuf) - return - case nextBuf <- buf: + nextBuf <- buf + } +} + +func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { + for ctx.Err() == nil { + //lgr.Printf("%s nextbuf %s, waiting for nextBuf\n",nextBuf,time.Now()) + buf := <-nextBuf + //lgr.Printf("%s nextbuf %s, done waiting for nextBuf\n",nextBuf,time.Now()) + locator, _, err := kc.PutB(buf) + if err != nil { + lgr.Print(err) + errorsChan <- struct{}{} + continue } + bytesOutChan <- uint64(len(buf)) + nextLocator.Store(locator) } } -func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) { - defer wg.Done() +func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, lgr *log.Logger) { + if ctx.Err() != nil { + return + } + locatorsMap := make(map[string]bool) + var locators []string + var count int64 + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, "") + if err != nil { + lgr.Fatalf("Error getting index: %s\n", err) + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + locatorsMap[strings.Split(scanner.Text(), " ")[0]] = true + count++ + } + } + for l := range locatorsMap { + locators = append(locators, l) + } + lgr.Printf("Found %d locators\n", count) + lgr.Printf("Found %d locators (deduplicated)\n", len(locators)) + if len(locators) < 1 { + lgr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the -use-index cli argument.") + } - for { + mathRand.Seed(time.Now().UnixNano()) + mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] }) + + for _, locator := range locators { + // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml + signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey)) select { - case <-stopCh: + case <-ctx.Done(): return - case buf := <-nextBuf: - locator, _, err := kc.PutB(buf) - if err != nil { - stderr.Print(err) - errorsChan <- struct{}{} - continue - } - select { - case <-stopCh: - return - case bytesOutChan <- uint64(len(buf)): - } - nextLocator.Store(locator) + case indexLocatorChan <- signedLocator: } } + lgr.Fatal("Error: ran out of locators to read!") } -func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) { - defer wg.Done() +func loadConfig(lgr *log.Logger) (cluster *arvados.Cluster) { + loader := config.NewLoader(os.Stdin, nil) + loader.SkipLegacy = true - var locator string - for { - locator = nextLocator.Load().(string) + cfg, err := loader.Load() + if err != nil { + lgr.Fatal(err) + } + cluster, err = cfg.GetCluster("") + if err != nil { + lgr.Fatal(err) + } + return +} + +func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { + for ctx.Err() == nil { + var locator string + if indexLocatorChan != nil { + select { + case <-ctx.Done(): + return + case locator = <-indexLocatorChan: + } + } else { + locator = nextLocator.Load().(string) + } rdr, size, url, err := kc.Get(locator) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} continue } n, err := io.Copy(ioutil.Discard, rdr) rdr.Close() if n != size || err != nil { - stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) + lgr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) errorsChan <- struct{}{} continue // Note we don't count the bytes received in // partial/corrupt responses: we are measuring // throughput, not resource consumption. } - select { - case <-stopCh: - return - case bytesInChan <- uint64(n): - } + bytesInChan <- uint64(n) } } -func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { +func overrideServices(kc *keepclient.KeepClient, lgr *log.Logger) { roots := make(map[string]string) if *ServiceURL != "" { roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL @@ -329,7 +428,7 @@ func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { } } if len(roots) == 0 { - stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) + lgr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) } } else { return