1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Testing tool for Keep services.
7 // keepexercise helps measure throughput and test reliability under
8 // various usage patterns.
10 // By default, it reads and writes blocks containing 2^26 NUL
11 // bytes. This generates network traffic without consuming much disk
14 // For a more realistic test, enable -vary-request. Warning: this will
15 // fill your storage volumes with random data if you leave it running,
16 // which can cost you money or leave you with too little room for
35 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
36 "git.arvados.org/arvados.git/sdk/go/keepclient"
41 // Command line config knobs
43 BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
44 ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers")
45 WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers")
46 VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
47 VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
48 Replicas = flag.Int("replicas", 1, "replication level for writing")
49 StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
50 ServiceURL = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)")
51 ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
52 getVersion = flag.Bool("version", false, "Print version information and exit.")
53 RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
59 // Print version information if requested
61 fmt.Printf("keep-exercise %s\n", version)
65 stderr := log.New(os.Stderr, "", log.LstdFlags)
67 arv, err := arvadosclient.MakeArvadosClient()
71 kc, err := keepclient.MakeKeepClient(arv)
75 kc.Want_replicas = *Replicas
77 transport := *(http.DefaultTransport.(*http.Transport))
78 transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
79 kc.HTTPClient = &http.Client{
80 Timeout: 10 * time.Minute,
81 Transport: &transport,
84 overrideServices(kc, stderr)
86 nextLocator := make(chan string, *ReadThreads+*WriteThreads)
88 go countBeans(nextLocator, stderr)
89 for i := 0; i < *WriteThreads; i++ {
90 nextBuf := make(chan []byte, 1)
91 go makeBufs(nextBuf, i, stderr)
92 go doWrites(kc, nextBuf, nextLocator, stderr)
94 for i := 0; i < *ReadThreads; i++ {
95 go doReads(kc, nextLocator, stderr)
100 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
101 var bytesInChan = make(chan uint64)
102 var bytesOutChan = make(chan uint64)
104 // Send struct{}{} to errorsChan when an error happens.
105 var errorsChan = make(chan struct{})
107 func countBeans(nextLocator chan string, stderr *log.Logger) {
109 var tickChan <-chan time.Time
110 var endChan <-chan time.Time
111 c := make(chan os.Signal)
112 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
113 if *StatsInterval > 0 {
114 tickChan = time.NewTicker(*StatsInterval).C
117 endChan = time.NewTicker(*RunTime).C
122 var rateIn, rateOut float64
123 var maxRateIn, maxRateOut float64
124 var abort, printCsv bool
125 csv := log.New(os.Stdout, "", 0)
126 csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
137 fmt.Print("\r") // Suppress the ^C print
138 case i := <-bytesInChan:
140 case o := <-bytesOutChan:
146 elapsed := time.Since(t0)
147 rateIn = float64(bytesIn) / elapsed.Seconds() / 1048576
148 if rateIn > maxRateIn {
151 rateOut = float64(bytesOut) / elapsed.Seconds() / 1048576
152 if rateOut > maxRateOut {
155 csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
156 time.Now().Format("2006-01-02 15:04:05"),
158 bytesIn, rateIn, maxRateIn,
159 bytesOut, rateOut, maxRateOut,
180 func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
181 buf := make([]byte, *BlockSize)
183 binary.PutVarint(buf, int64(threadID))
186 if randSize > *BlockSize {
187 randSize = *BlockSize
191 rnd := make([]byte, randSize)
192 if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
195 buf = append(rnd, buf[randSize:]...)
201 func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
202 for buf := range nextBuf {
203 locator, _, err := kc.PutB(buf)
206 errorsChan <- struct{}{}
209 bytesOutChan <- uint64(len(buf))
210 for cap(nextLocator) > len(nextLocator)+*WriteThreads {
211 // Give the readers something to do, unless
212 // they have lots queued up already.
213 nextLocator <- locator
218 func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
219 for locator := range nextLocator {
220 rdr, size, url, err := kc.Get(locator)
223 errorsChan <- struct{}{}
226 n, err := io.Copy(ioutil.Discard, rdr)
228 if n != size || err != nil {
229 stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
230 errorsChan <- struct{}{}
232 // Note we don't count the bytes received in
233 // partial/corrupt responses: we are measuring
234 // throughput, not resource consumption.
236 bytesInChan <- uint64(n)
240 func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) {
241 roots := make(map[string]string)
242 if *ServiceURL != "" {
243 roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
244 } else if *ServiceUUID != "" {
245 for uuid, url := range kc.GatewayRoots() {
246 if uuid == *ServiceUUID {
252 stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
257 kc.SetServiceRoots(roots, roots, roots)