return err
}
+func (h *Handler) Done() <-chan struct{} {
+ return nil
+}
+
func neverRedirect(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
func (h *Handler) setup() {
return disp.pool.CheckHealth()
}
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+ return disp.stopped
+}
+
// Stop dispatching containers and release resources. Typically used
// in tests.
func (disp *dispatcher) Close() {
type Handler interface {
http.Handler
CheckHealth() error
+ Done() <-chan struct{}
}
type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string, registry *prometheus.Registry) Handler
logger.WithError(err).Errorf("error notifying init daemon")
}
go func() {
+ // Shut down server if caller cancels context
<-ctx.Done()
srv.Close()
}()
+ go func() {
+ // Shut down server if handler dies
+ <-handler.Done()
+ srv.Close()
+ }()
err = srv.Wait()
if err != nil {
return 1
func (eh errorHandler) CheckHealth() error {
return eh.err
}
+
+// Done returns a closed channel to indicate the service has
+// stopped/failed.
+func (eh errorHandler) Done() <-chan struct{} {
+ return doneChannel
+}
+
+var doneChannel = func() <-chan struct{} {
+ done := make(chan struct{})
+ close(done)
+ return done
+}()
return nil
}
+func (agg *Aggregator) Done() <-chan struct{} {
+ return nil
+}
+
func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
agg.setupOnce.Do(agg.setup)
sendErr := func(statusCode int, err error) {
return nil
}
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+ return nil
+}
+
func (srv *Server) run() {
var err error
if srv.RunOptions.Once {
return h.err
}
+func (h *handler) Done() <-chan struct{} {
+ return nil
+}
+
func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
var h handler
serviceURL, ok := service.URLFromContext(ctx)
handler *handler
mux *http.ServeMux
setupOnce sync.Once
+ done chan struct{}
lastReqID int64
lastReqMtx sync.Mutex
rtr.setupOnce.Do(rtr.setup)
return rtr.eventSource.DBHealth()
}
+
+func (rtr *router) Done() <-chan struct{} {
+ return rtr.done
+}
import (
"context"
"fmt"
- "os"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/service"
QueueSize: cluster.API.WebsocketServerEventQueue,
Logger: ctxlog.FromContext(ctx),
}
+ done := make(chan struct{})
go func() {
eventSource.Run()
ctxlog.FromContext(ctx).Error("event source stopped")
- if !testMode {
- os.Exit(1)
- }
+ close(done)
}()
eventSource.WaitReady()
if err := eventSource.DBHealth(); err != nil {
client: client,
eventSource: eventSource,
newPermChecker: func() permChecker { return newPermChecker(*client) },
+ done: done,
}
return rtr
}