From: Ward Vandewege Date: Mon, 6 Jul 2020 13:37:32 +0000 (-0400) Subject: 16585: Add --repeat cli argument, which automatically repeats an X-Git-Tag: 2.1.0~158^2~5 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e 16585: Add --repeat cli argument, which automatically repeats an expirement N times. Add warmup phase to ensure there are always sufficient blocks available for all read threads. Refactor nextLocator to be an atomic.Value instead of a channel, to avoid read starvation. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 163291c238..7641465aa3 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -29,6 +29,8 @@ import ( "net/http" "os" "os/signal" + "sync" + "sync/atomic" "syscall" "time" @@ -51,8 +53,12 @@ var ( ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise") getVersion = flag.Bool("version", false, "Print version information and exit.") RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)") + Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)") ) +var summary string +var csvHeader string + func main() { flag.Parse() @@ -64,6 +70,14 @@ func main() { stderr := log.New(os.Stderr, "", log.LstdFlags) + if *ReadThreads > 0 && *WriteThreads == 0 { + stderr.Fatal("At least one write thread is required if rthreads is non-zero") + } + + if *ReadThreads == 0 && *WriteThreads == 0 { + stderr.Fatal("Nothing to do!") + } + arv, err := arvadosclient.MakeArvadosClient() if err != nil { stderr.Fatal(err) @@ -74,27 +88,62 @@ func main() { } kc.Want_replicas = *Replicas - transport := *(http.DefaultTransport.(*http.Transport)) - transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure) kc.HTTPClient = &http.Client{ - Timeout: 10 * time.Minute, - Transport: &transport, + Timeout: 10 * time.Minute, + // It's not safe to copy *http.DefaultTransport + // because it has a mutex (which might be locked) + // protecting a private map (which might not be nil). + // So we build our own, using the Go 1.12 default + // values. + Transport: &http.Transport{ + TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), + }, } overrideServices(kc, stderr) + csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat" + + for i := 0; i < *Repeat; i++ { + runExperiment(kc, stderr) + stderr.Printf("*************************** experiment %d complete ******************************\n", i) + summary += fmt.Sprintf(",%d\n", i) + } + stderr.Println("Summary:") + stderr.Println() + fmt.Println(csvHeader + ",Experiment") + fmt.Println(summary) +} - nextLocator := make(chan string, *ReadThreads+*WriteThreads) +func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) { + var wg sync.WaitGroup + var nextLocator atomic.Value - go countBeans(nextLocator, stderr) + wg.Add(1) + stopCh := make(chan struct{}) + if *ReadThreads > 0 { + stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + } for i := 0; i < *WriteThreads; i++ { nextBuf := make(chan []byte, 1) - go makeBufs(nextBuf, i, stderr) - go doWrites(kc, nextBuf, nextLocator, stderr) + wg.Add(1) + go makeBufs(&wg, nextBuf, i, stopCh, stderr) + wg.Add(1) + go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr) + } + if *ReadThreads > 0 { + for nextLocator.Load() == nil { + select { + case _ = <-bytesOutChan: + } + } + stderr.Printf("Warmup complete") } + go countBeans(&wg, stopCh, stderr) for i := 0; i < *ReadThreads; i++ { - go doReads(kc, nextLocator, stderr) + wg.Add(1) + go doReads(&wg, kc, &nextLocator, stopCh, stderr) } - <-make(chan struct{}) + wg.Wait() } // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. @@ -104,11 +153,12 @@ var bytesOutChan = make(chan uint64) // Send struct{}{} to errorsChan when an error happens. var errorsChan = make(chan struct{}) -func countBeans(nextLocator chan string, stderr *log.Logger) { +func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) { + defer wg.Done() t0 := time.Now() var tickChan <-chan time.Time var endChan <-chan time.Time - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) if *StatsInterval > 0 { tickChan = time.NewTicker(*StatsInterval).C @@ -121,16 +171,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) { var errors uint64 var rateIn, rateOut float64 var maxRateIn, maxRateOut float64 - var abort, printCsv bool + var exit, abort, printCsv bool csv := log.New(os.Stdout, "", 0) - csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime") + csv.Println(csvHeader) for { select { case <-tickChan: printCsv = true case <-endChan: printCsv = true - abort = true + exit = true case <-c: printCsv = true abort = true @@ -152,7 +202,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) { if rateOut > maxRateOut { maxRateOut = rateOut } - csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s", + line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d", time.Now().Format("2006-01-02 15:04:05"), elapsed, bytesIn, rateIn, maxRateIn, @@ -168,16 +218,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) { *ServiceURL, *ServiceUUID, *RunTime, + *Repeat, ) + csv.Println(line) + if exit { + summary += line + } printCsv = false } if abort { os.Exit(0) } + if exit { + close(stopCh) + break + } } } -func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { +func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) { + defer wg.Done() buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) @@ -194,29 +254,45 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { } buf = append(rnd, buf[randSize:]...) } - nextBuf <- buf + select { + case <-stopCh: + close(nextBuf) + return + case nextBuf <- buf: + } } } -func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) { - for buf := range nextBuf { - locator, _, err := kc.PutB(buf) - if err != nil { - stderr.Print(err) - errorsChan <- struct{}{} - continue - } - bytesOutChan <- uint64(len(buf)) - for cap(nextLocator) > len(nextLocator)+*WriteThreads { - // Give the readers something to do, unless - // they have lots queued up already. - nextLocator <- locator +func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) { + defer wg.Done() + + for { + select { + case <-stopCh: + return + case buf := <-nextBuf: + locator, _, err := kc.PutB(buf) + if err != nil { + stderr.Print(err) + errorsChan <- struct{}{} + continue + } + select { + case <-stopCh: + return + case bytesOutChan <- uint64(len(buf)): + } + nextLocator.Store(locator) } } } -func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) { - for locator := range nextLocator { +func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) { + defer wg.Done() + + var locator string + for { + locator = nextLocator.Load().(string) rdr, size, url, err := kc.Get(locator) if err != nil { stderr.Print(err) @@ -233,7 +309,11 @@ func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.L // partial/corrupt responses: we are measuring // throughput, not resource consumption. } - bytesInChan <- uint64(n) + select { + case <-stopCh: + return + case bytesInChan <- uint64(n): + } } }