From 8989c3ca9c63dc05975b2c02e56c2031246beb96 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 1 Dec 2023 15:48:13 -0500 Subject: [PATCH] 21227: Fix copied mutexes. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/boot/supervisor.go | 21 +++++++++---- sdk/go/keepclient/keepclient.go | 21 +++++++++++++ services/keepproxy/keepproxy.go | 50 +++++++++++++++++++----------- services/keepstore/proxy_remote.go | 4 +-- services/keepstore/pull_worker.go | 4 +-- 5 files changed, 72 insertions(+), 28 deletions(-) diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index e25fb8cdba..ac269b933a 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -204,15 +204,24 @@ func (super *Supervisor) Wait() error { func (super *Supervisor) startFederation(cfg *arvados.Config) { super.children = map[string]*Supervisor{} for id, cc := range cfg.Clusters { - super2 := *super yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}}) if err != nil { panic(fmt.Sprintf("json.Marshal partial config: %s", err)) } - super2.ConfigYAML = string(yaml) - super2.ConfigPath = "-" - super2.children = nil - + super2 := &Supervisor{ + ConfigPath: "-", + ConfigYAML: string(yaml), + SourcePath: super.SourcePath, + SourceVersion: super.SourceVersion, + ClusterType: super.ClusterType, + ListenHost: super.ListenHost, + ControllerAddr: super.ControllerAddr, + NoWorkbench1: super.NoWorkbench1, + NoWorkbench2: super.NoWorkbench2, + OwnTemporaryDatabase: super.OwnTemporaryDatabase, + Stdin: super.Stdin, + Stderr: super.Stderr, + } if super2.ClusterType == "test" { super2.Stderr = &service.LogPrefixer{ Writer: super.Stderr, @@ -220,7 +229,7 @@ func (super *Supervisor) startFederation(cfg *arvados.Config) { } } super2.Start(super.ctx) - super.children[id] = &super2 + super.children[id] = super2 } } diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 68ac886ddd..86001c01e0 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -120,6 +120,27 @@ type KeepClient struct { disableDiscovery bool } +func (kc *KeepClient) Clone() *KeepClient { + kc.lock.Lock() + defer kc.lock.Unlock() + return &KeepClient{ + Arvados: kc.Arvados, + Want_replicas: kc.Want_replicas, + localRoots: kc.localRoots, + writableLocalRoots: kc.writableLocalRoots, + gatewayRoots: kc.gatewayRoots, + HTTPClient: kc.HTTPClient, + Retries: kc.Retries, + BlockCache: kc.BlockCache, + RequestID: kc.RequestID, + StorageClasses: kc.StorageClasses, + DefaultStorageClasses: kc.DefaultStorageClasses, + replicasPerService: kc.replicasPerService, + foundNonDiskSvc: kc.foundNonDiskSvc, + disableDiscovery: kc.disableDiscovery, + } +} + func (kc *KeepClient) loadDefaultClasses() error { scData, err := kc.Arvados.ClusterConfig("StorageClasses") if err != nil { diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 2090c50686..a79883147b 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -175,13 +175,18 @@ func (h *proxyHandler) checkAuthorizationHeader(req *http.Request) (pass bool, t return true, tok, user } -// We need to make a private copy of the default http transport early -// in initialization, then make copies of our private copy later. It -// won't be safe to copy http.DefaultTransport itself later, because -// its private mutexes might have already been used. (Without this, -// the test suite sometimes panics "concurrent map writes" in -// net/http.(*Transport).removeIdleConnLocked().) -var defaultTransport = *(http.DefaultTransport.(*http.Transport)) +// We can't copy the default http transport because http.Transport has +// a mutex field, so we make our own using the values of the exported +// fields. +var defaultTransport = http.Transport{ + Proxy: http.DefaultTransport.(*http.Transport).Proxy, + DialContext: http.DefaultTransport.(*http.Transport).DialContext, + ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2, + MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns, + IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout, + TLSHandshakeTimeout: http.DefaultTransport.(*http.Transport).TLSHandshakeTimeout, + ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout, +} type proxyHandler struct { http.Handler @@ -195,14 +200,23 @@ type proxyHandler struct { func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster) (service.Handler, error) { rest := mux.NewRouter() - transport := defaultTransport - transport.DialContext = (&net.Dialer{ - Timeout: keepclient.DefaultConnectTimeout, - KeepAlive: keepclient.DefaultKeepAlive, - DualStack: true, - }).DialContext - transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure) - transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout + // We can't copy the default http transport because + // http.Transport has a mutex field, so we copy the fields + // that we know have non-zero values in http.DefaultTransport. + transport := &http.Transport{ + Proxy: http.DefaultTransport.(*http.Transport).Proxy, + ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2, + MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns, + IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout, + ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout, + DialContext: (&net.Dialer{ + Timeout: keepclient.DefaultConnectTimeout, + KeepAlive: keepclient.DefaultKeepAlive, + DualStack: true, + }).DialContext, + TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure), + TLSHandshakeTimeout: keepclient.DefaultTLSHandshakeTimeout, + } cacheQ, err := lru.New2Q(500) if err != nil { @@ -213,7 +227,7 @@ func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Dur Handler: rest, KeepClient: kc, timeout: timeout, - transport: &transport, + transport: transport, apiTokenCache: &apiTokenCache{ tokens: cacheQ, expireTime: 300, @@ -566,7 +580,7 @@ func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) { } func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient { - kc := *h.KeepClient + kc := h.KeepClient.Clone() kc.RequestID = req.Header.Get("X-Request-Id") kc.HTTPClient = &proxyClient{ client: &http.Client{ @@ -575,5 +589,5 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient }, proto: req.Proto, } - return &kc + return kc } diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go index 526bc25299..66a7b43751 100644 --- a/services/keepstore/proxy_remote.go +++ b/services/keepstore/proxy_remote.go @@ -130,14 +130,14 @@ func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.Remot } accopy := *kc.Arvados accopy.ApiToken = token - kccopy := *kc + kccopy := kc.Clone() kccopy.Arvados = &accopy token, err := auth.SaltToken(token, remoteID) if err != nil { return nil, err } kccopy.Arvados.ApiToken = token - return &kccopy, nil + return kccopy, nil } var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`) diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go index abe3dc3857..b9194fe6f6 100644 --- a/services/keepstore/pull_worker.go +++ b/services/keepstore/pull_worker.go @@ -50,7 +50,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error { // Make a private copy of keepClient so we can set // ServiceRoots to the source servers specified in the pull // request. - keepClient := *h.keepClient + keepClient := h.keepClient.Clone() serviceRoots := make(map[string]string) for _, addr := range pullRequest.Servers { serviceRoots[addr] = addr @@ -59,7 +59,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error { signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute)) - reader, contentLen, _, err := GetContent(signedLocator, &keepClient) + reader, contentLen, _, err := GetContent(signedLocator, keepClient) if err != nil { return err } -- 2.30.2