16585: Add --repeat cli argument, which automatically repeats an
authorWard Vandewege <ward@curii.com>
Mon, 6 Jul 2020 13:37:32 +0000 (09:37 -0400)
committerWard Vandewege <ward@curii.com>
Tue, 7 Jul 2020 16:06:21 +0000 (12:06 -0400)
expirement N times. Add warmup phase to ensure there are always
sufficient blocks available for all read threads. Refactor nextLocator
to be an atomic.Value instead of a channel, to avoid read starvation.

Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

tools/keep-exercise/keep-exercise.go

index 163291c238773c257c831f25691cdb9be8cb777e..7641465aa329056db3a559db3f032181246e4a32 100644 (file)
@@ -29,6 +29,8 @@ import (
        "net/http"
        "os"
        "os/signal"
        "net/http"
        "os"
        "os/signal"
+       "sync"
+       "sync/atomic"
        "syscall"
        "time"
 
        "syscall"
        "time"
 
@@ -51,8 +53,12 @@ var (
        ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
        getVersion    = flag.Bool("version", false, "Print version information and exit.")
        RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
        ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
        getVersion    = flag.Bool("version", false, "Print version information and exit.")
        RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+       Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
 )
 
 )
 
+var summary string
+var csvHeader string
+
 func main() {
        flag.Parse()
 
 func main() {
        flag.Parse()
 
@@ -64,6 +70,14 @@ func main() {
 
        stderr := log.New(os.Stderr, "", log.LstdFlags)
 
 
        stderr := log.New(os.Stderr, "", log.LstdFlags)
 
+       if *ReadThreads > 0 && *WriteThreads == 0 {
+               stderr.Fatal("At least one write thread is required if rthreads is non-zero")
+       }
+
+       if *ReadThreads == 0 && *WriteThreads == 0 {
+               stderr.Fatal("Nothing to do!")
+       }
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                stderr.Fatal(err)
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                stderr.Fatal(err)
@@ -74,27 +88,62 @@ func main() {
        }
        kc.Want_replicas = *Replicas
 
        }
        kc.Want_replicas = *Replicas
 
-       transport := *(http.DefaultTransport.(*http.Transport))
-       transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
        kc.HTTPClient = &http.Client{
        kc.HTTPClient = &http.Client{
-               Timeout:   10 * time.Minute,
-               Transport: &transport,
+               Timeout: 10 * time.Minute,
+               // It's not safe to copy *http.DefaultTransport
+               // because it has a mutex (which might be locked)
+               // protecting a private map (which might not be nil).
+               // So we build our own, using the Go 1.12 default
+               // values.
+               Transport: &http.Transport{
+                       TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+               },
        }
 
        overrideServices(kc, stderr)
        }
 
        overrideServices(kc, stderr)
+       csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+       for i := 0; i < *Repeat; i++ {
+               runExperiment(kc, stderr)
+               stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+               summary += fmt.Sprintf(",%d\n", i)
+       }
+       stderr.Println("Summary:")
+       stderr.Println()
+       fmt.Println(csvHeader + ",Experiment")
+       fmt.Println(summary)
+}
 
 
-       nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+       var wg sync.WaitGroup
+       var nextLocator atomic.Value
 
 
-       go countBeans(nextLocator, stderr)
+       wg.Add(1)
+       stopCh := make(chan struct{})
+       if *ReadThreads > 0 {
+               stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
+       }
        for i := 0; i < *WriteThreads; i++ {
                nextBuf := make(chan []byte, 1)
        for i := 0; i < *WriteThreads; i++ {
                nextBuf := make(chan []byte, 1)
-               go makeBufs(nextBuf, i, stderr)
-               go doWrites(kc, nextBuf, nextLocator, stderr)
+               wg.Add(1)
+               go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+               wg.Add(1)
+               go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
+       }
+       if *ReadThreads > 0 {
+               for nextLocator.Load() == nil {
+                       select {
+                       case _ = <-bytesOutChan:
+                       }
+               }
+               stderr.Printf("Warmup complete")
        }
        }
+       go countBeans(&wg, stopCh, stderr)
        for i := 0; i < *ReadThreads; i++ {
        for i := 0; i < *ReadThreads; i++ {
-               go doReads(kc, nextLocator, stderr)
+               wg.Add(1)
+               go doReads(&wg, kc, &nextLocator, stopCh, stderr)
        }
        }
-       <-make(chan struct{})
+       wg.Wait()
 }
 
 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
 }
 
 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
@@ -104,11 +153,12 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
+       defer wg.Done()
        t0 := time.Now()
        var tickChan <-chan time.Time
        var endChan <-chan time.Time
        t0 := time.Now()
        var tickChan <-chan time.Time
        var endChan <-chan time.Time
-       c := make(chan os.Signal)
+       c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt, syscall.SIGTERM)
        if *StatsInterval > 0 {
                tickChan = time.NewTicker(*StatsInterval).C
        signal.Notify(c, os.Interrupt, syscall.SIGTERM)
        if *StatsInterval > 0 {
                tickChan = time.NewTicker(*StatsInterval).C
@@ -121,16 +171,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
        var errors uint64
        var rateIn, rateOut float64
        var maxRateIn, maxRateOut float64
        var errors uint64
        var rateIn, rateOut float64
        var maxRateIn, maxRateOut float64
-       var abort, printCsv bool
+       var exit, abort, printCsv bool
        csv := log.New(os.Stdout, "", 0)
        csv := log.New(os.Stdout, "", 0)
-       csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+       csv.Println(csvHeader)
        for {
                select {
                case <-tickChan:
                        printCsv = true
                case <-endChan:
                        printCsv = true
        for {
                select {
                case <-tickChan:
                        printCsv = true
                case <-endChan:
                        printCsv = true
-                       abort = true
+                       exit = true
                case <-c:
                        printCsv = true
                        abort = true
                case <-c:
                        printCsv = true
                        abort = true
@@ -152,7 +202,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
                        if rateOut > maxRateOut {
                                maxRateOut = rateOut
                        }
                        if rateOut > maxRateOut {
                                maxRateOut = rateOut
                        }
-                       csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+                       line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
                                time.Now().Format("2006-01-02 15:04:05"),
                                elapsed,
                                bytesIn, rateIn, maxRateIn,
                                time.Now().Format("2006-01-02 15:04:05"),
                                elapsed,
                                bytesIn, rateIn, maxRateIn,
@@ -168,16 +218,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
                                *ServiceURL,
                                *ServiceUUID,
                                *RunTime,
                                *ServiceURL,
                                *ServiceUUID,
                                *RunTime,
+                               *Repeat,
                        )
                        )
+                       csv.Println(line)
+                       if exit {
+                               summary += line
+                       }
                        printCsv = false
                }
                if abort {
                        os.Exit(0)
                }
                        printCsv = false
                }
                if abort {
                        os.Exit(0)
                }
+               if exit {
+                       close(stopCh)
+                       break
+               }
        }
 }
 
        }
 }
 
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+       defer wg.Done()
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
@@ -194,29 +254,45 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
                        }
                        buf = append(rnd, buf[randSize:]...)
                }
                        }
                        buf = append(rnd, buf[randSize:]...)
                }
-               nextBuf <- buf
+               select {
+               case <-stopCh:
+                       close(nextBuf)
+                       return
+               case nextBuf <- buf:
+               }
        }
 }
 
        }
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
-       for buf := range nextBuf {
-               locator, _, err := kc.PutB(buf)
-               if err != nil {
-                       stderr.Print(err)
-                       errorsChan <- struct{}{}
-                       continue
-               }
-               bytesOutChan <- uint64(len(buf))
-               for cap(nextLocator) > len(nextLocator)+*WriteThreads {
-                       // Give the readers something to do, unless
-                       // they have lots queued up already.
-                       nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+       defer wg.Done()
+
+       for {
+               select {
+               case <-stopCh:
+                       return
+               case buf := <-nextBuf:
+                       locator, _, err := kc.PutB(buf)
+                       if err != nil {
+                               stderr.Print(err)
+                               errorsChan <- struct{}{}
+                               continue
+                       }
+                       select {
+                       case <-stopCh:
+                               return
+                       case bytesOutChan <- uint64(len(buf)):
+                       }
+                       nextLocator.Store(locator)
                }
        }
 }
 
                }
        }
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
-       for locator := range nextLocator {
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+       defer wg.Done()
+
+       var locator string
+       for {
+               locator = nextLocator.Load().(string)
                rdr, size, url, err := kc.Get(locator)
                if err != nil {
                        stderr.Print(err)
                rdr, size, url, err := kc.Get(locator)
                if err != nil {
                        stderr.Print(err)
@@ -233,7 +309,11 @@ func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.L
                        // partial/corrupt responses: we are measuring
                        // throughput, not resource consumption.
                }
                        // partial/corrupt responses: we are measuring
                        // throughput, not resource consumption.
                }
-               bytesInChan <- uint64(n)
+               select {
+               case <-stopCh:
+                       return
+               case bytesInChan <- uint64(n):
+               }
        }
 }
 
        }
 }