+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
// Testing tool for Keep services.
//
// keepexercise helps measure throughput and test reliability under
"io"
"io/ioutil"
"log"
+ "net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
Replicas = flag.Int("replicas", 1, "replication level for writing")
StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+ ServiceURL = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)")
+ ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
)
func main() {
if err != nil {
log.Fatal(err)
}
- kc, err := keepclient.MakeKeepClient(&arv)
+ kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
log.Fatal(err)
}
kc.Want_replicas = *Replicas
- kc.Client.Timeout = 10 * time.Minute
- nextBuf := make(chan []byte, *WriteThreads)
+ transport := *(http.DefaultTransport.(*http.Transport))
+ transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
+ kc.HTTPClient = &http.Client{
+ Timeout: 10 * time.Minute,
+ Transport: &transport,
+ }
+
+ overrideServices(kc)
+
nextLocator := make(chan string, *ReadThreads+*WriteThreads)
go countBeans(nextLocator)
for i := 0; i < *WriteThreads; i++ {
+ nextBuf := make(chan []byte, 1)
go makeBufs(nextBuf, i)
go doWrites(kc, nextBuf, nextLocator)
}
}
}
-func makeBufs(nextBuf chan []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int) {
buf := make([]byte, *BlockSize)
if *VaryThread {
binary.PutVarint(buf, int64(threadID))
}
+ randSize := 524288
+ if randSize > *BlockSize {
+ randSize = *BlockSize
+ }
for {
if *VaryRequest {
- if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ rnd := make([]byte, randSize)
+ if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
log.Fatal(err)
}
+ buf = append(rnd, buf[randSize:]...)
}
nextBuf <- buf
}
}
-func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
for buf := range nextBuf {
locator, _, err := kc.PutB(buf)
if err != nil {
}
}
-func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
for locator := range nextLocator {
rdr, size, url, err := kc.Get(locator)
if err != nil {
bytesInChan <- uint64(n)
}
}
+
+func overrideServices(kc *keepclient.KeepClient) {
+ roots := make(map[string]string)
+ if *ServiceURL != "" {
+ roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
+ } else if *ServiceUUID != "" {
+ for uuid, url := range kc.GatewayRoots() {
+ if uuid == *ServiceUUID {
+ roots[uuid] = url
+ break
+ }
+ }
+ if len(roots) == 0 {
+ log.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
+ }
+ } else {
+ return
+ }
+ kc.SetServiceRoots(roots, roots, roots)
+}