1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cmd"
21 "git.arvados.org/arvados.git/lib/controller/dblock"
22 "git.arvados.org/arvados.git/lib/ctrlctx"
23 "git.arvados.org/arvados.git/lib/dispatchcloud"
24 "git.arvados.org/arvados.git/lib/service"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
27 "git.arvados.org/arvados.git/sdk/go/auth"
28 "git.arvados.org/arvados.git/sdk/go/ctxlog"
29 "git.arvados.org/arvados.git/sdk/go/dispatch"
30 "git.arvados.org/arvados.git/sdk/go/health"
31 "github.com/julienschmidt/httprouter"
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/prometheus/client_golang/prometheus/promhttp"
34 "github.com/sirupsen/logrus"
37 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
39 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
40 ac, err := arvados.NewClientFromConfig(cluster)
42 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
55 type dispatcher struct {
56 Cluster *arvados.Cluster
57 Context context.Context
58 ArvClient *arvados.Client
60 Registry *prometheus.Registry
62 logger logrus.FieldLogger
63 dbConnector ctrlctx.DBConnector
66 arvDispatcher *dispatch.Dispatcher
67 httpHandler http.Handler
74 // Start starts the dispatcher. Start can be called multiple times
75 // with no ill effect.
76 func (disp *dispatcher) Start() {
77 disp.initOnce.Do(func() {
79 dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
81 defer dblock.Dispatch.Unlock()
82 disp.checkLsfQueueForOrphans()
83 err := disp.arvDispatcher.Run(disp.Context)
85 disp.logger.Error(err)
92 // ServeHTTP implements service.Handler.
93 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
95 disp.httpHandler.ServeHTTP(w, r)
98 // CheckHealth implements service.Handler.
99 func (disp *dispatcher) CheckHealth() error {
103 return errors.New("stopped")
109 // Done implements service.Handler.
110 func (disp *dispatcher) Done() <-chan struct{} {
114 // Stop dispatching containers and release resources. Used by tests.
115 func (disp *dispatcher) Close() {
118 case disp.stop <- struct{}{}:
124 func (disp *dispatcher) init() {
125 disp.logger = ctxlog.FromContext(disp.Context)
126 disp.lsfcli.logger = disp.logger
127 disp.lsfqueue = lsfqueue{
129 period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
130 lsfcli: &disp.lsfcli,
132 disp.ArvClient.AuthToken = disp.AuthToken
133 disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
134 disp.stop = make(chan struct{}, 1)
135 disp.stopped = make(chan struct{})
137 arv, err := arvadosclient.New(disp.ArvClient)
139 disp.logger.Fatalf("Error making Arvados client: %v", err)
142 arv.ApiToken = disp.AuthToken
143 disp.arvDispatcher = &dispatch.Dispatcher{
146 BatchSize: disp.Cluster.API.MaxItemsPerResponse,
147 RunContainer: disp.runContainer,
148 PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
149 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
152 if disp.Cluster.ManagementToken == "" {
153 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
154 http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
157 mux := httprouter.New()
158 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
159 ErrorLog: disp.logger,
161 mux.Handler("GET", "/metrics", metricsH)
162 mux.Handler("GET", "/metrics.json", metricsH)
163 mux.Handler("GET", "/_health/:check", &health.Handler{
164 Token: disp.Cluster.ManagementToken,
166 Routes: health.Routes{"ping": disp.CheckHealth},
168 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
172 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
173 ctx, cancel := context.WithCancel(disp.Context)
176 if ctr.State != dispatch.Locked {
177 // already started by prior invocation
178 } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
179 if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
180 err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
181 "container": map[string]interface{}{
182 "runtime_status": map[string]string{
183 "error": err.Error(),
188 return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
190 return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
192 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
193 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
194 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
195 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
196 err := disp.submit(ctr, cmd)
202 disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
203 defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
205 go func(uuid string) {
206 for ctx.Err() == nil {
207 _, ok := disp.lsfqueue.Lookup(uuid)
209 // If the container disappears from
210 // the lsf queue, there is no point in
211 // waiting for further dispatch
212 // updates: just clean up and return.
213 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
220 for done := false; !done; {
223 // Disappeared from lsf queue
224 if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
225 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
228 case dispatch.Running:
229 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
230 case dispatch.Locked:
231 disp.arvDispatcher.Unlock(ctr.UUID)
234 case updated, ok := <-status:
236 // status channel is closed, which is
237 // how arvDispatcher tells us to stop
238 // touching the container record, kill
239 // off any remaining LSF processes,
244 if updated.State != ctr.State {
245 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
248 if ctr.Priority < 1 {
249 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
252 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
256 disp.logger.Printf("container %s is done", ctr.UUID)
258 // Try "bkill" every few seconds until the LSF job disappears
260 ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
262 for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
263 err := disp.lsfcli.Bkill(qent.ID)
265 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
272 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
273 // Start with an empty slice here to ensure append() doesn't
274 // modify crunchRunCommand's underlying array
276 crArgs = append(crArgs, crunchRunCommand...)
277 crArgs = append(crArgs, container.UUID)
279 h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
280 fmt.Fprint(h, container.UUID)
281 authsecret := fmt.Sprintf("%x", h.Sum(nil))
283 crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
285 bsubArgs, err := disp.bsubArgs(container)
289 return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
292 func (disp *dispatcher) bkill(ctr arvados.Container) {
293 if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
294 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
295 } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
296 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
300 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
301 args := []string{"bsub"}
303 tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
304 vcpus := container.RuntimeConstraints.VCPUs
305 mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
306 container.RuntimeConstraints.KeepCacheRAM+
307 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
309 maxruntime := time.Duration(container.SchedulingParameters.MaxRunTime) * time.Second
311 maxruntime = disp.Cluster.Containers.LSF.MaxRunTimeDefault.Duration()
314 maxruntime += disp.Cluster.Containers.LSF.MaxRunTimeOverhead.Duration()
316 maxrunminutes := int64(math.Ceil(float64(maxruntime.Seconds()) / 60))
318 repl := map[string]string{
320 "%C": fmt.Sprintf("%d", vcpus),
321 "%M": fmt.Sprintf("%d", mem),
322 "%T": fmt.Sprintf("%d", tmp),
323 "%U": container.UUID,
324 "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
325 "%W": fmt.Sprintf("%d", maxrunminutes),
328 re := regexp.MustCompile(`%.`)
329 var substitutionErrors string
330 argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
331 if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
332 argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
334 for idx, a := range argumentTemplate {
335 if idx > 0 && (argumentTemplate[idx-1] == "-W" || argumentTemplate[idx-1] == "-We") && a == "%W" && maxrunminutes == 0 {
336 // LSF docs don't specify an argument to "-W"
337 // or "-We" that indicates "unknown", so
338 // instead we drop the "-W %W" part of the
339 // command line entirely when max runtime is
341 args = args[:len(args)-1]
344 args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
347 substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
352 if len(substitutionErrors) != 0 {
353 return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
356 if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
357 args = append([]string{"sudo", "-E", "-u", u}, args...)
362 // Check the next bjobs report, and invoke TrackContainer for all the
363 // containers in the report. This gives us a chance to cancel existing
364 // Arvados LSF jobs (started by a previous dispatch process) that
365 // never released their LSF job allocations even though their
366 // container states are Cancelled or Complete. See
367 // https://dev.arvados.org/issues/10979
368 func (disp *dispatcher) checkLsfQueueForOrphans() {
369 containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
370 for _, uuid := range disp.lsfqueue.All() {
371 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
374 err := disp.arvDispatcher.TrackContainer(uuid)
376 disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
381 func execScript(args []string, env map[string]string) []byte {
383 for k, v := range env {
385 s += strings.Replace(v, `'`, `'\''`, -1)
389 for _, w := range args {
391 s += strings.Replace(w, `'`, `'\''`, -1)
394 return []byte(s + "\n")