+ return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+ ldr.Path = path
+ cfg, err := ldr.Load()
+ if err != nil {
+ fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+ return nil
+ }
+ fmt.Fprintf(stderr, "loaded config file %s\n", path)
+ return cluster
+}
+
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.KeepBuffers < 1 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+ return nil, nil
+ }
+ if configData.Cluster == nil {
+ fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
+ return nil, nil
+ }
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ localaddr := localKeepstoreAddr()
+ ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
+ if err != nil {
+ return nil, err
+ }
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+ ln.Close()
+ url := "http://" + net.JoinHostPort(localaddr, port)
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = append(cmd.Env,
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
+ err = cmd.Start()
+ if err != nil {
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
+ }
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)