From 098a6b73cd5b7ad391ed30e95b825817d22aec24 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 25 Mar 2020 11:28:25 -0400 Subject: [PATCH] 16217: Exit service command if the service handler fails. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/controller/handler.go | 4 ++++ lib/dispatchcloud/dispatcher.go | 5 +++++ lib/service/cmd.go | 7 +++++++ lib/service/error.go | 12 ++++++++++++ sdk/go/health/aggregator.go | 4 ++++ services/keep-balance/server.go | 5 +++++ services/keepstore/command.go | 4 ++++ services/ws/router.go | 5 +++++ services/ws/service.go | 7 +++---- 9 files changed, 49 insertions(+), 4 deletions(-) diff --git a/lib/controller/handler.go b/lib/controller/handler.go index e3869261a1..3929a1103f 100644 --- a/lib/controller/handler.go +++ b/lib/controller/handler.go @@ -67,6 +67,10 @@ func (h *Handler) CheckHealth() error { return err } +func (h *Handler) Done() <-chan struct{} { + return nil +} + func neverRedirect(*http.Request, []*http.Request) error { return http.ErrUseLastResponse } func (h *Handler) setup() { diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index 4023896f79..02b6c976ae 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -82,6 +82,11 @@ func (disp *dispatcher) CheckHealth() error { return disp.pool.CheckHealth() } +// Done implements service.Handler. +func (disp *dispatcher) Done() <-chan struct{} { + return disp.stopped +} + // Stop dispatching containers and release resources. Typically used // in tests. func (disp *dispatcher) Close() { diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 7f2f78ee9a..1e7a9a36ed 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -29,6 +29,7 @@ import ( type Handler interface { http.Handler CheckHealth() error + Done() <-chan struct{} } type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string, registry *prometheus.Registry) Handler @@ -148,9 +149,15 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout logger.WithError(err).Errorf("error notifying init daemon") } go func() { + // Shut down server if caller cancels context <-ctx.Done() srv.Close() }() + go func() { + // Shut down server if handler dies + <-handler.Done() + srv.Close() + }() err = srv.Wait() if err != nil { return 1 diff --git a/lib/service/error.go b/lib/service/error.go index c4049f7064..a4d7370d1b 100644 --- a/lib/service/error.go +++ b/lib/service/error.go @@ -36,3 +36,15 @@ func (eh errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (eh errorHandler) CheckHealth() error { return eh.err } + +// Done returns a closed channel to indicate the service has +// stopped/failed. +func (eh errorHandler) Done() <-chan struct{} { + return doneChannel +} + +var doneChannel = func() <-chan struct{} { + done := make(chan struct{}) + close(done) + return done +}() diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go index a0284e8f24..794adabdd3 100644 --- a/sdk/go/health/aggregator.go +++ b/sdk/go/health/aggregator.go @@ -46,6 +46,10 @@ func (agg *Aggregator) CheckHealth() error { return nil } +func (agg *Aggregator) Done() <-chan struct{} { + return nil +} + func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) { agg.setupOnce.Do(agg.setup) sendErr := func(statusCode int, err error) { diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index 05658b5e5d..9801a3fd45 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -53,6 +53,11 @@ func (srv *Server) CheckHealth() error { return nil } +// Done implements service.Handler. +func (srv *Server) Done() <-chan struct{} { + return nil +} + func (srv *Server) run() { var err error if srv.RunOptions.Once { diff --git a/services/keepstore/command.go b/services/keepstore/command.go index e0509393cf..be26397736 100644 --- a/services/keepstore/command.go +++ b/services/keepstore/command.go @@ -132,6 +132,10 @@ func (h *handler) CheckHealth() error { return h.err } +func (h *handler) Done() <-chan struct{} { + return nil +} + func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler { var h handler serviceURL, ok := service.URLFromContext(ctx) diff --git a/services/ws/router.go b/services/ws/router.go index b1764c156c..b3403dabd0 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -37,6 +37,7 @@ type router struct { handler *handler mux *http.ServeMux setupOnce sync.Once + done chan struct{} lastReqID int64 lastReqMtx sync.Mutex @@ -165,3 +166,7 @@ func (rtr *router) CheckHealth() error { rtr.setupOnce.Do(rtr.setup) return rtr.eventSource.DBHealth() } + +func (rtr *router) Done() <-chan struct{} { + return rtr.done +} diff --git a/services/ws/service.go b/services/ws/service.go index fb313bb799..c38dcf59e6 100644 --- a/services/ws/service.go +++ b/services/ws/service.go @@ -7,7 +7,6 @@ package ws import ( "context" "fmt" - "os" "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/lib/service" @@ -31,12 +30,11 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg QueueSize: cluster.API.WebsocketServerEventQueue, Logger: ctxlog.FromContext(ctx), } + done := make(chan struct{}) go func() { eventSource.Run() ctxlog.FromContext(ctx).Error("event source stopped") - if !testMode { - os.Exit(1) - } + close(done) }() eventSource.WaitReady() if err := eventSource.DBHealth(); err != nil { @@ -47,6 +45,7 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg client: client, eventSource: eventSource, newPermChecker: func() permChecker { return newPermChecker(*client) }, + done: done, } return rtr } -- 2.30.2