"net/http"
"os"
"os/signal"
+ "sync"
+ "sync/atomic"
"syscall"
"time"
ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
getVersion = flag.Bool("version", false, "Print version information and exit.")
RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+ Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
)
+var summary string
+var csvHeader string
+
func main() {
flag.Parse()
stderr := log.New(os.Stderr, "", log.LstdFlags)
+ if *ReadThreads > 0 && *WriteThreads == 0 {
+ stderr.Fatal("At least one write thread is required if rthreads is non-zero")
+ }
+
+ if *ReadThreads == 0 && *WriteThreads == 0 {
+ stderr.Fatal("Nothing to do!")
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
stderr.Fatal(err)
}
kc.Want_replicas = *Replicas
- transport := *(http.DefaultTransport.(*http.Transport))
- transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
kc.HTTPClient = &http.Client{
- Timeout: 10 * time.Minute,
- Transport: &transport,
+ Timeout: 10 * time.Minute,
+ // It's not safe to copy *http.DefaultTransport
+ // because it has a mutex (which might be locked)
+ // protecting a private map (which might not be nil).
+ // So we build our own, using the Go 1.12 default
+ // values.
+ Transport: &http.Transport{
+ TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+ },
}
overrideServices(kc, stderr)
+ csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+ for i := 0; i < *Repeat; i++ {
+ runExperiment(kc, stderr)
+ stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+ summary += fmt.Sprintf(",%d\n", i)
+ }
+ stderr.Println("Summary:")
+ stderr.Println()
+ fmt.Println(csvHeader + ",Experiment")
+ fmt.Println(summary)
+}
- nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+ var wg sync.WaitGroup
+ var nextLocator atomic.Value
- go countBeans(nextLocator, stderr)
+ wg.Add(1)
+ stopCh := make(chan struct{})
+ if *ReadThreads > 0 {
+ stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
+ }
for i := 0; i < *WriteThreads; i++ {
nextBuf := make(chan []byte, 1)
- go makeBufs(nextBuf, i, stderr)
- go doWrites(kc, nextBuf, nextLocator, stderr)
+ wg.Add(1)
+ go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+ wg.Add(1)
+ go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
+ }
+ if *ReadThreads > 0 {
+ for nextLocator.Load() == nil {
+ select {
+ case _ = <-bytesOutChan:
+ }
+ }
+ stderr.Printf("Warmup complete")
}
+ go countBeans(&wg, stopCh, stderr)
for i := 0; i < *ReadThreads; i++ {
- go doReads(kc, nextLocator, stderr)
+ wg.Add(1)
+ go doReads(&wg, kc, &nextLocator, stopCh, stderr)
}
- <-make(chan struct{})
+ wg.Wait()
}
// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
// Send struct{}{} to errorsChan when an error happens.
var errorsChan = make(chan struct{})
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
t0 := time.Now()
var tickChan <-chan time.Time
var endChan <-chan time.Time
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
if *StatsInterval > 0 {
tickChan = time.NewTicker(*StatsInterval).C
var errors uint64
var rateIn, rateOut float64
var maxRateIn, maxRateOut float64
- var abort, printCsv bool
+ var exit, abort, printCsv bool
csv := log.New(os.Stdout, "", 0)
- csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+ csv.Println(csvHeader)
for {
select {
case <-tickChan:
printCsv = true
case <-endChan:
printCsv = true
- abort = true
+ exit = true
case <-c:
printCsv = true
abort = true
if rateOut > maxRateOut {
maxRateOut = rateOut
}
- csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+ line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
time.Now().Format("2006-01-02 15:04:05"),
elapsed,
bytesIn, rateIn, maxRateIn,
*ServiceURL,
*ServiceUUID,
*RunTime,
+ *Repeat,
)
+ csv.Println(line)
+ if exit {
+ summary += line
+ }
printCsv = false
}
if abort {
os.Exit(0)
}
+ if exit {
+ close(stopCh)
+ break
+ }
}
}
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
buf := make([]byte, *BlockSize)
if *VaryThread {
binary.PutVarint(buf, int64(threadID))
}
buf = append(rnd, buf[randSize:]...)
}
- nextBuf <- buf
+ select {
+ case <-stopCh:
+ close(nextBuf)
+ return
+ case nextBuf <- buf:
+ }
}
}
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
- for buf := range nextBuf {
- locator, _, err := kc.PutB(buf)
- if err != nil {
- stderr.Print(err)
- errorsChan <- struct{}{}
- continue
- }
- bytesOutChan <- uint64(len(buf))
- for cap(nextLocator) > len(nextLocator)+*WriteThreads {
- // Give the readers something to do, unless
- // they have lots queued up already.
- nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
+
+ for {
+ select {
+ case <-stopCh:
+ return
+ case buf := <-nextBuf:
+ locator, _, err := kc.PutB(buf)
+ if err != nil {
+ stderr.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ select {
+ case <-stopCh:
+ return
+ case bytesOutChan <- uint64(len(buf)):
+ }
+ nextLocator.Store(locator)
}
}
}
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
- for locator := range nextLocator {
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
+
+ var locator string
+ for {
+ locator = nextLocator.Load().(string)
rdr, size, url, err := kc.Get(locator)
if err != nil {
stderr.Print(err)
// partial/corrupt responses: we are measuring
// throughput, not resource consumption.
}
- bytesInChan <- uint64(n)
+ select {
+ case <-stopCh:
+ return
+ case bytesInChan <- uint64(n):
+ }
}
}