X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3da10f0c0f5a0c0c91d49436a5995c890b03d228..09a2e88c51e5432e607f2a38466e55b4ba15e887:/tools/keep-exercise/keep-exercise.go diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index a94c01e55b..6d791bf987 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -36,6 +36,8 @@ var ( VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks") Replicas = flag.Int("replicas", 1, "replication level for writing") StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable") + ServiceURL = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)") + ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise") ) func main() { @@ -45,18 +47,20 @@ func main() { if err != nil { log.Fatal(err) } - kc, err := keepclient.MakeKeepClient(&arv) + kc, err := keepclient.MakeKeepClient(arv) if err != nil { log.Fatal(err) } kc.Want_replicas = *Replicas kc.Client.Timeout = 10 * time.Minute - nextBuf := make(chan []byte, *WriteThreads) + overrideServices(kc) + nextLocator := make(chan string, *ReadThreads+*WriteThreads) go countBeans(nextLocator) for i := 0; i < *WriteThreads; i++ { + nextBuf := make(chan []byte, 1) go makeBufs(nextBuf, i) go doWrites(kc, nextBuf, nextLocator) } @@ -102,22 +106,28 @@ func countBeans(nextLocator chan string) { } } -func makeBufs(nextBuf chan []byte, threadID int) { +func makeBufs(nextBuf chan<- []byte, threadID int) { buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) } + randSize := 524288 + if randSize > *BlockSize { + randSize = *BlockSize + } for { if *VaryRequest { - if _, err := io.ReadFull(rand.Reader, buf); err != nil { + rnd := make([]byte, randSize) + if _, err := io.ReadFull(rand.Reader, rnd); err != nil { log.Fatal(err) } + buf = append(rnd, buf[randSize:]...) } nextBuf <- buf } } -func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) { +func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) { for buf := range nextBuf { locator, _, err := kc.PutB(buf) if err != nil { @@ -134,7 +144,7 @@ func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan s } } -func doReads(kc *keepclient.KeepClient, nextLocator chan string) { +func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) { for locator := range nextLocator { rdr, size, url, err := kc.Get(locator) if err != nil { @@ -155,3 +165,23 @@ func doReads(kc *keepclient.KeepClient, nextLocator chan string) { bytesInChan <- uint64(n) } } + +func overrideServices(kc *keepclient.KeepClient) { + roots := make(map[string]string) + if *ServiceURL != "" { + roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL + } else if *ServiceUUID != "" { + for uuid, url := range kc.GatewayRoots() { + if uuid == *ServiceUUID { + roots[uuid] = url + break + } + } + if len(roots) == 0 { + log.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) + } + } else { + return + } + kc.SetServiceRoots(roots, roots, roots) +}