16585: Add --repeat cli argument, which automatically repeats an
[arvados.git] / tools / keep-exercise / keep-exercise.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Testing tool for Keep services.
6 //
7 // keepexercise helps measure throughput and test reliability under
8 // various usage patterns.
9 //
10 // By default, it reads and writes blocks containing 2^26 NUL
11 // bytes. This generates network traffic without consuming much disk
12 // space.
13 //
14 // For a more realistic test, enable -vary-request. Warning: this will
15 // fill your storage volumes with random data if you leave it running,
16 // which can cost you money or leave you with too little room for
17 // useful data.
18 //
19 package main
20
21 import (
22         "crypto/rand"
23         "encoding/binary"
24         "flag"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "net/http"
30         "os"
31         "os/signal"
32         "sync"
33         "sync/atomic"
34         "syscall"
35         "time"
36
37         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
38         "git.arvados.org/arvados.git/sdk/go/keepclient"
39 )
40
41 var version = "dev"
42
43 // Command line config knobs
44 var (
45         BlockSize     = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
46         ReadThreads   = flag.Int("rthreads", 1, "number of concurrent readers")
47         WriteThreads  = flag.Int("wthreads", 1, "number of concurrent writers")
48         VaryRequest   = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
49         VaryThread    = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
50         Replicas      = flag.Int("replicas", 1, "replication level for writing")
51         StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
52         ServiceURL    = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)")
53         ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
54         getVersion    = flag.Bool("version", false, "Print version information and exit.")
55         RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
56         Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
57 )
58
59 var summary string
60 var csvHeader string
61
62 func main() {
63         flag.Parse()
64
65         // Print version information if requested
66         if *getVersion {
67                 fmt.Printf("keep-exercise %s\n", version)
68                 os.Exit(0)
69         }
70
71         stderr := log.New(os.Stderr, "", log.LstdFlags)
72
73         if *ReadThreads > 0 && *WriteThreads == 0 {
74                 stderr.Fatal("At least one write thread is required if rthreads is non-zero")
75         }
76
77         if *ReadThreads == 0 && *WriteThreads == 0 {
78                 stderr.Fatal("Nothing to do!")
79         }
80
81         arv, err := arvadosclient.MakeArvadosClient()
82         if err != nil {
83                 stderr.Fatal(err)
84         }
85         kc, err := keepclient.MakeKeepClient(arv)
86         if err != nil {
87                 stderr.Fatal(err)
88         }
89         kc.Want_replicas = *Replicas
90
91         kc.HTTPClient = &http.Client{
92                 Timeout: 10 * time.Minute,
93                 // It's not safe to copy *http.DefaultTransport
94                 // because it has a mutex (which might be locked)
95                 // protecting a private map (which might not be nil).
96                 // So we build our own, using the Go 1.12 default
97                 // values.
98                 Transport: &http.Transport{
99                         TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
100                 },
101         }
102
103         overrideServices(kc, stderr)
104         csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
105
106         for i := 0; i < *Repeat; i++ {
107                 runExperiment(kc, stderr)
108                 stderr.Printf("*************************** experiment %d complete ******************************\n", i)
109                 summary += fmt.Sprintf(",%d\n", i)
110         }
111         stderr.Println("Summary:")
112         stderr.Println()
113         fmt.Println(csvHeader + ",Experiment")
114         fmt.Println(summary)
115 }
116
117 func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
118         var wg sync.WaitGroup
119         var nextLocator atomic.Value
120
121         wg.Add(1)
122         stopCh := make(chan struct{})
123         if *ReadThreads > 0 {
124                 stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
125         }
126         for i := 0; i < *WriteThreads; i++ {
127                 nextBuf := make(chan []byte, 1)
128                 wg.Add(1)
129                 go makeBufs(&wg, nextBuf, i, stopCh, stderr)
130                 wg.Add(1)
131                 go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
132         }
133         if *ReadThreads > 0 {
134                 for nextLocator.Load() == nil {
135                         select {
136                         case _ = <-bytesOutChan:
137                         }
138                 }
139                 stderr.Printf("Warmup complete")
140         }
141         go countBeans(&wg, stopCh, stderr)
142         for i := 0; i < *ReadThreads; i++ {
143                 wg.Add(1)
144                 go doReads(&wg, kc, &nextLocator, stopCh, stderr)
145         }
146         wg.Wait()
147 }
148
149 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
150 var bytesInChan = make(chan uint64)
151 var bytesOutChan = make(chan uint64)
152
153 // Send struct{}{} to errorsChan when an error happens.
154 var errorsChan = make(chan struct{})
155
156 func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
157         defer wg.Done()
158         t0 := time.Now()
159         var tickChan <-chan time.Time
160         var endChan <-chan time.Time
161         c := make(chan os.Signal, 1)
162         signal.Notify(c, os.Interrupt, syscall.SIGTERM)
163         if *StatsInterval > 0 {
164                 tickChan = time.NewTicker(*StatsInterval).C
165         }
166         if *RunTime > 0 {
167                 endChan = time.NewTicker(*RunTime).C
168         }
169         var bytesIn uint64
170         var bytesOut uint64
171         var errors uint64
172         var rateIn, rateOut float64
173         var maxRateIn, maxRateOut float64
174         var exit, abort, printCsv bool
175         csv := log.New(os.Stdout, "", 0)
176         csv.Println(csvHeader)
177         for {
178                 select {
179                 case <-tickChan:
180                         printCsv = true
181                 case <-endChan:
182                         printCsv = true
183                         exit = true
184                 case <-c:
185                         printCsv = true
186                         abort = true
187                         fmt.Print("\r") // Suppress the ^C print
188                 case i := <-bytesInChan:
189                         bytesIn += i
190                 case o := <-bytesOutChan:
191                         bytesOut += o
192                 case <-errorsChan:
193                         errors++
194                 }
195                 if printCsv {
196                         elapsed := time.Since(t0)
197                         rateIn = float64(bytesIn) / elapsed.Seconds() / 1048576
198                         if rateIn > maxRateIn {
199                                 maxRateIn = rateIn
200                         }
201                         rateOut = float64(bytesOut) / elapsed.Seconds() / 1048576
202                         if rateOut > maxRateOut {
203                                 maxRateOut = rateOut
204                         }
205                         line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
206                                 time.Now().Format("2006-01-02 15:04:05"),
207                                 elapsed,
208                                 bytesIn, rateIn, maxRateIn,
209                                 bytesOut, rateOut, maxRateOut,
210                                 errors,
211                                 *ReadThreads,
212                                 *WriteThreads,
213                                 *VaryRequest,
214                                 *VaryThread,
215                                 *BlockSize,
216                                 *Replicas,
217                                 *StatsInterval,
218                                 *ServiceURL,
219                                 *ServiceUUID,
220                                 *RunTime,
221                                 *Repeat,
222                         )
223                         csv.Println(line)
224                         if exit {
225                                 summary += line
226                         }
227                         printCsv = false
228                 }
229                 if abort {
230                         os.Exit(0)
231                 }
232                 if exit {
233                         close(stopCh)
234                         break
235                 }
236         }
237 }
238
239 func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
240         defer wg.Done()
241         buf := make([]byte, *BlockSize)
242         if *VaryThread {
243                 binary.PutVarint(buf, int64(threadID))
244         }
245         randSize := 524288
246         if randSize > *BlockSize {
247                 randSize = *BlockSize
248         }
249         for {
250                 if *VaryRequest {
251                         rnd := make([]byte, randSize)
252                         if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
253                                 stderr.Fatal(err)
254                         }
255                         buf = append(rnd, buf[randSize:]...)
256                 }
257                 select {
258                 case <-stopCh:
259                         close(nextBuf)
260                         return
261                 case nextBuf <- buf:
262                 }
263         }
264 }
265
266 func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
267         defer wg.Done()
268
269         for {
270                 select {
271                 case <-stopCh:
272                         return
273                 case buf := <-nextBuf:
274                         locator, _, err := kc.PutB(buf)
275                         if err != nil {
276                                 stderr.Print(err)
277                                 errorsChan <- struct{}{}
278                                 continue
279                         }
280                         select {
281                         case <-stopCh:
282                                 return
283                         case bytesOutChan <- uint64(len(buf)):
284                         }
285                         nextLocator.Store(locator)
286                 }
287         }
288 }
289
290 func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
291         defer wg.Done()
292
293         var locator string
294         for {
295                 locator = nextLocator.Load().(string)
296                 rdr, size, url, err := kc.Get(locator)
297                 if err != nil {
298                         stderr.Print(err)
299                         errorsChan <- struct{}{}
300                         continue
301                 }
302                 n, err := io.Copy(ioutil.Discard, rdr)
303                 rdr.Close()
304                 if n != size || err != nil {
305                         stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
306                         errorsChan <- struct{}{}
307                         continue
308                         // Note we don't count the bytes received in
309                         // partial/corrupt responses: we are measuring
310                         // throughput, not resource consumption.
311                 }
312                 select {
313                 case <-stopCh:
314                         return
315                 case bytesInChan <- uint64(n):
316                 }
317         }
318 }
319
320 func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) {
321         roots := make(map[string]string)
322         if *ServiceURL != "" {
323                 roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
324         } else if *ServiceUUID != "" {
325                 for uuid, url := range kc.GatewayRoots() {
326                         if uuid == *ServiceUUID {
327                                 roots[uuid] = url
328                                 break
329                         }
330                 }
331                 if len(roots) == 0 {
332                         stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
333                 }
334         } else {
335                 return
336         }
337         kc.SetServiceRoots(roots, roots, roots)
338 }