Fix 2.4.2 upgrade notes formatting refs #19330
[arvados.git] / lib / lsf / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "crypto/hmac"
10         "crypto/sha256"
11         "errors"
12         "fmt"
13         "math"
14         "net/http"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cmd"
21         "git.arvados.org/arvados.git/lib/dispatchcloud"
22         "git.arvados.org/arvados.git/lib/service"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/dispatch"
28         "git.arvados.org/arvados.git/sdk/go/health"
29         "github.com/julienschmidt/httprouter"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/prometheus/client_golang/prometheus/promhttp"
32         "github.com/sirupsen/logrus"
33 )
34
35 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
36
37 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
38         ac, err := arvados.NewClientFromConfig(cluster)
39         if err != nil {
40                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
41         }
42         d := &dispatcher{
43                 Cluster:   cluster,
44                 Context:   ctx,
45                 ArvClient: ac,
46                 AuthToken: token,
47                 Registry:  reg,
48         }
49         go d.Start()
50         return d
51 }
52
53 type dispatcher struct {
54         Cluster   *arvados.Cluster
55         Context   context.Context
56         ArvClient *arvados.Client
57         AuthToken string
58         Registry  *prometheus.Registry
59
60         logger        logrus.FieldLogger
61         lsfcli        lsfcli
62         lsfqueue      lsfqueue
63         arvDispatcher *dispatch.Dispatcher
64         httpHandler   http.Handler
65
66         initOnce sync.Once
67         stop     chan struct{}
68         stopped  chan struct{}
69 }
70
71 // Start starts the dispatcher. Start can be called multiple times
72 // with no ill effect.
73 func (disp *dispatcher) Start() {
74         disp.initOnce.Do(func() {
75                 disp.init()
76                 go func() {
77                         disp.checkLsfQueueForOrphans()
78                         err := disp.arvDispatcher.Run(disp.Context)
79                         if err != nil {
80                                 disp.logger.Error(err)
81                                 disp.Close()
82                         }
83                 }()
84         })
85 }
86
87 // ServeHTTP implements service.Handler.
88 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
89         disp.Start()
90         disp.httpHandler.ServeHTTP(w, r)
91 }
92
93 // CheckHealth implements service.Handler.
94 func (disp *dispatcher) CheckHealth() error {
95         disp.Start()
96         select {
97         case <-disp.stopped:
98                 return errors.New("stopped")
99         default:
100                 return nil
101         }
102 }
103
104 // Done implements service.Handler.
105 func (disp *dispatcher) Done() <-chan struct{} {
106         return disp.stopped
107 }
108
109 // Stop dispatching containers and release resources. Used by tests.
110 func (disp *dispatcher) Close() {
111         disp.Start()
112         select {
113         case disp.stop <- struct{}{}:
114         default:
115         }
116         <-disp.stopped
117 }
118
119 func (disp *dispatcher) init() {
120         disp.logger = ctxlog.FromContext(disp.Context)
121         disp.lsfcli.logger = disp.logger
122         disp.lsfqueue = lsfqueue{
123                 logger: disp.logger,
124                 period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
125                 lsfcli: &disp.lsfcli,
126         }
127         disp.ArvClient.AuthToken = disp.AuthToken
128         disp.stop = make(chan struct{}, 1)
129         disp.stopped = make(chan struct{})
130
131         arv, err := arvadosclient.New(disp.ArvClient)
132         if err != nil {
133                 disp.logger.Fatalf("Error making Arvados client: %v", err)
134         }
135         arv.Retries = 25
136         arv.ApiToken = disp.AuthToken
137         disp.arvDispatcher = &dispatch.Dispatcher{
138                 Arv:            arv,
139                 Logger:         disp.logger,
140                 BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
141                 RunContainer:   disp.runContainer,
142                 PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
143                 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
144         }
145
146         if disp.Cluster.ManagementToken == "" {
147                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
149                 })
150         } else {
151                 mux := httprouter.New()
152                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
153                         ErrorLog: disp.logger,
154                 })
155                 mux.Handler("GET", "/metrics", metricsH)
156                 mux.Handler("GET", "/metrics.json", metricsH)
157                 mux.Handler("GET", "/_health/:check", &health.Handler{
158                         Token:  disp.Cluster.ManagementToken,
159                         Prefix: "/_health/",
160                         Routes: health.Routes{"ping": disp.CheckHealth},
161                 })
162                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
163         }
164 }
165
166 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
167         ctx, cancel := context.WithCancel(disp.Context)
168         defer cancel()
169
170         if ctr.State != dispatch.Locked {
171                 // already started by prior invocation
172         } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
173                 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
174                 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
175                 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
176                 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
177                 err := disp.submit(ctr, cmd)
178                 if err != nil {
179                         return err
180                 }
181         }
182
183         disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
184         defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
185
186         go func(uuid string) {
187                 cancelled := false
188                 for ctx.Err() == nil {
189                         qent, ok := disp.lsfqueue.Lookup(uuid)
190                         if !ok {
191                                 // If the container disappears from
192                                 // the lsf queue, there is no point in
193                                 // waiting for further dispatch
194                                 // updates: just clean up and return.
195                                 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
196                                 cancel()
197                                 return
198                         }
199                         if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
200                                 disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
201                                 err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
202                                         "container": map[string]interface{}{
203                                                 "runtime_status": map[string]string{
204                                                         "error": qent.PendReason,
205                                                 },
206                                         },
207                                 }, nil)
208                                 if err != nil {
209                                         disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
210                                         continue // retry
211                                 }
212                                 err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
213                                 if err != nil {
214                                         continue // retry (UpdateState() already logged the error)
215                                 }
216                                 cancelled = true
217                         }
218                 }
219         }(ctr.UUID)
220
221         for done := false; !done; {
222                 select {
223                 case <-ctx.Done():
224                         // Disappeared from lsf queue
225                         if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
226                                 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
227                         }
228                         switch ctr.State {
229                         case dispatch.Running:
230                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
231                         case dispatch.Locked:
232                                 disp.arvDispatcher.Unlock(ctr.UUID)
233                         }
234                         return nil
235                 case updated, ok := <-status:
236                         if !ok {
237                                 // status channel is closed, which is
238                                 // how arvDispatcher tells us to stop
239                                 // touching the container record, kill
240                                 // off any remaining LSF processes,
241                                 // etc.
242                                 done = true
243                                 break
244                         }
245                         if updated.State != ctr.State {
246                                 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
247                         }
248                         ctr = updated
249                         if ctr.Priority < 1 {
250                                 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
251                                 disp.bkill(ctr)
252                         } else {
253                                 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
254                         }
255                 }
256         }
257         disp.logger.Printf("container %s is done", ctr.UUID)
258
259         // Try "bkill" every few seconds until the LSF job disappears
260         // from the queue.
261         ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
262         defer ticker.Stop()
263         for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
264                 err := disp.lsfcli.Bkill(qent.ID)
265                 if err != nil {
266                         disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
267                 }
268                 <-ticker.C
269         }
270         return nil
271 }
272
273 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
274         // Start with an empty slice here to ensure append() doesn't
275         // modify crunchRunCommand's underlying array
276         var crArgs []string
277         crArgs = append(crArgs, crunchRunCommand...)
278         crArgs = append(crArgs, container.UUID)
279
280         h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
281         fmt.Fprint(h, container.UUID)
282         authsecret := fmt.Sprintf("%x", h.Sum(nil))
283
284         crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
285
286         bsubArgs, err := disp.bsubArgs(container)
287         if err != nil {
288                 return err
289         }
290         return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
291 }
292
293 func (disp *dispatcher) bkill(ctr arvados.Container) {
294         if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
295                 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
296         } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
297                 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
298         }
299 }
300
301 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
302         args := []string{"bsub"}
303
304         tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
305         vcpus := container.RuntimeConstraints.VCPUs
306         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
307                 container.RuntimeConstraints.KeepCacheRAM+
308                 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
309
310         repl := map[string]string{
311                 "%%": "%",
312                 "%C": fmt.Sprintf("%d", vcpus),
313                 "%M": fmt.Sprintf("%d", mem),
314                 "%T": fmt.Sprintf("%d", tmp),
315                 "%U": container.UUID,
316                 "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
317         }
318
319         re := regexp.MustCompile(`%.`)
320         var substitutionErrors string
321         argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
322         if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
323                 argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
324         }
325         for _, a := range argumentTemplate {
326                 args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
327                         subst := repl[s]
328                         if len(subst) == 0 {
329                                 substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
330                         }
331                         return subst
332                 }))
333         }
334         if len(substitutionErrors) != 0 {
335                 return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
336         }
337
338         if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
339                 args = append([]string{"sudo", "-E", "-u", u}, args...)
340         }
341         return args, nil
342 }
343
344 // Check the next bjobs report, and invoke TrackContainer for all the
345 // containers in the report. This gives us a chance to cancel existing
346 // Arvados LSF jobs (started by a previous dispatch process) that
347 // never released their LSF job allocations even though their
348 // container states are Cancelled or Complete. See
349 // https://dev.arvados.org/issues/10979
350 func (disp *dispatcher) checkLsfQueueForOrphans() {
351         containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
352         for _, uuid := range disp.lsfqueue.All() {
353                 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
354                         continue
355                 }
356                 err := disp.arvDispatcher.TrackContainer(uuid)
357                 if err != nil {
358                         disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
359                 }
360         }
361 }
362
363 func execScript(args []string, env map[string]string) []byte {
364         s := "#!/bin/sh\n"
365         for k, v := range env {
366                 s += k + `='`
367                 s += strings.Replace(v, `'`, `'\''`, -1)
368                 s += `' `
369         }
370         s += `exec`
371         for _, w := range args {
372                 s += ` '`
373                 s += strings.Replace(w, `'`, `'\''`, -1)
374                 s += `'`
375         }
376         return []byte(s + "\n")
377 }