Merge branch '21703-collection-update-lock'
[arvados.git] / lib / lsf / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "crypto/hmac"
10         "crypto/sha256"
11         "errors"
12         "fmt"
13         "math"
14         "net/http"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cmd"
21         "git.arvados.org/arvados.git/lib/controller/dblock"
22         "git.arvados.org/arvados.git/lib/ctrlctx"
23         "git.arvados.org/arvados.git/lib/dispatchcloud"
24         "git.arvados.org/arvados.git/lib/service"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
27         "git.arvados.org/arvados.git/sdk/go/auth"
28         "git.arvados.org/arvados.git/sdk/go/ctxlog"
29         "git.arvados.org/arvados.git/sdk/go/dispatch"
30         "git.arvados.org/arvados.git/sdk/go/health"
31         "github.com/julienschmidt/httprouter"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/prometheus/client_golang/prometheus/promhttp"
34         "github.com/sirupsen/logrus"
35 )
36
37 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
38
39 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
40         ac, err := arvados.NewClientFromConfig(cluster)
41         if err != nil {
42                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
43         }
44         d := &dispatcher{
45                 Cluster:   cluster,
46                 Context:   ctx,
47                 ArvClient: ac,
48                 AuthToken: token,
49                 Registry:  reg,
50         }
51         go d.Start()
52         return d
53 }
54
55 type dispatcher struct {
56         Cluster   *arvados.Cluster
57         Context   context.Context
58         ArvClient *arvados.Client
59         AuthToken string
60         Registry  *prometheus.Registry
61
62         logger        logrus.FieldLogger
63         dbConnector   ctrlctx.DBConnector
64         lsfcli        lsfcli
65         lsfqueue      lsfqueue
66         arvDispatcher *dispatch.Dispatcher
67         httpHandler   http.Handler
68
69         initOnce sync.Once
70         stop     chan struct{}
71         stopped  chan struct{}
72 }
73
74 // Start starts the dispatcher. Start can be called multiple times
75 // with no ill effect.
76 func (disp *dispatcher) Start() {
77         disp.initOnce.Do(func() {
78                 disp.init()
79                 dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
80                 go func() {
81                         defer dblock.Dispatch.Unlock()
82                         disp.checkLsfQueueForOrphans()
83                         err := disp.arvDispatcher.Run(disp.Context)
84                         if err != nil {
85                                 disp.logger.Error(err)
86                                 disp.Close()
87                         }
88                 }()
89         })
90 }
91
92 // ServeHTTP implements service.Handler.
93 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
94         disp.Start()
95         disp.httpHandler.ServeHTTP(w, r)
96 }
97
98 // CheckHealth implements service.Handler.
99 func (disp *dispatcher) CheckHealth() error {
100         disp.Start()
101         select {
102         case <-disp.stopped:
103                 return errors.New("stopped")
104         default:
105                 return nil
106         }
107 }
108
109 // Done implements service.Handler.
110 func (disp *dispatcher) Done() <-chan struct{} {
111         return disp.stopped
112 }
113
114 // Stop dispatching containers and release resources. Used by tests.
115 func (disp *dispatcher) Close() {
116         disp.Start()
117         select {
118         case disp.stop <- struct{}{}:
119         default:
120         }
121         <-disp.stopped
122 }
123
124 func (disp *dispatcher) init() {
125         disp.logger = ctxlog.FromContext(disp.Context)
126         disp.lsfcli.logger = disp.logger
127         disp.lsfqueue = lsfqueue{
128                 logger: disp.logger,
129                 period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
130                 lsfcli: &disp.lsfcli,
131         }
132         disp.ArvClient.AuthToken = disp.AuthToken
133         disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
134         disp.stop = make(chan struct{}, 1)
135         disp.stopped = make(chan struct{})
136
137         arv, err := arvadosclient.New(disp.ArvClient)
138         if err != nil {
139                 disp.logger.Fatalf("Error making Arvados client: %v", err)
140         }
141         arv.Retries = 25
142         arv.ApiToken = disp.AuthToken
143         disp.arvDispatcher = &dispatch.Dispatcher{
144                 Arv:            arv,
145                 Logger:         disp.logger,
146                 BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
147                 RunContainer:   disp.runContainer,
148                 PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
149                 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
150         }
151
152         if disp.Cluster.ManagementToken == "" {
153                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
154                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
155                 })
156         } else {
157                 mux := httprouter.New()
158                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
159                         ErrorLog: disp.logger,
160                 })
161                 mux.Handler("GET", "/metrics", metricsH)
162                 mux.Handler("GET", "/metrics.json", metricsH)
163                 mux.Handler("GET", "/_health/:check", &health.Handler{
164                         Token:  disp.Cluster.ManagementToken,
165                         Prefix: "/_health/",
166                         Routes: health.Routes{"ping": disp.CheckHealth},
167                 })
168                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
169         }
170 }
171
172 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
173         ctx, cancel := context.WithCancel(disp.Context)
174         defer cancel()
175
176         if ctr.State != dispatch.Locked {
177                 // already started by prior invocation
178         } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
179                 if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
180                         err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
181                                 "container": map[string]interface{}{
182                                         "runtime_status": map[string]string{
183                                                 "error": err.Error(),
184                                         },
185                                 },
186                         }, nil)
187                         if err != nil {
188                                 return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
189                         }
190                         return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
191                 }
192                 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
193                 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
194                 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
195                 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
196                 err := disp.submit(ctr, cmd)
197                 if err != nil {
198                         return err
199                 }
200         }
201
202         disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
203         defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
204
205         go func(uuid string) {
206                 for ctx.Err() == nil {
207                         _, ok := disp.lsfqueue.Lookup(uuid)
208                         if !ok {
209                                 // If the container disappears from
210                                 // the lsf queue, there is no point in
211                                 // waiting for further dispatch
212                                 // updates: just clean up and return.
213                                 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
214                                 cancel()
215                                 return
216                         }
217                 }
218         }(ctr.UUID)
219
220         for done := false; !done; {
221                 select {
222                 case <-ctx.Done():
223                         // Disappeared from lsf queue
224                         if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
225                                 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
226                         }
227                         switch ctr.State {
228                         case dispatch.Running:
229                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
230                         case dispatch.Locked:
231                                 disp.arvDispatcher.Unlock(ctr.UUID)
232                         }
233                         return nil
234                 case updated, ok := <-status:
235                         if !ok {
236                                 // status channel is closed, which is
237                                 // how arvDispatcher tells us to stop
238                                 // touching the container record, kill
239                                 // off any remaining LSF processes,
240                                 // etc.
241                                 done = true
242                                 break
243                         }
244                         if updated.State != ctr.State {
245                                 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
246                         }
247                         ctr = updated
248                         if ctr.Priority < 1 {
249                                 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
250                                 disp.bkill(ctr)
251                         } else {
252                                 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
253                         }
254                 }
255         }
256         disp.logger.Printf("container %s is done", ctr.UUID)
257
258         // Try "bkill" every few seconds until the LSF job disappears
259         // from the queue.
260         ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
261         defer ticker.Stop()
262         for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
263                 err := disp.lsfcli.Bkill(qent.ID)
264                 if err != nil {
265                         disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
266                 }
267                 <-ticker.C
268         }
269         return nil
270 }
271
272 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
273         // Start with an empty slice here to ensure append() doesn't
274         // modify crunchRunCommand's underlying array
275         var crArgs []string
276         crArgs = append(crArgs, crunchRunCommand...)
277         crArgs = append(crArgs, container.UUID)
278
279         h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
280         fmt.Fprint(h, container.UUID)
281         authsecret := fmt.Sprintf("%x", h.Sum(nil))
282
283         crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
284
285         bsubArgs, err := disp.bsubArgs(container)
286         if err != nil {
287                 return err
288         }
289         return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
290 }
291
292 func (disp *dispatcher) bkill(ctr arvados.Container) {
293         if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
294                 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
295         } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
296                 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
297         }
298 }
299
300 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
301         args := []string{"bsub"}
302
303         tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
304         vcpus := container.RuntimeConstraints.VCPUs
305         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
306                 container.RuntimeConstraints.KeepCacheRAM+
307                 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
308
309         maxruntime := time.Duration(container.SchedulingParameters.MaxRunTime) * time.Second
310         if maxruntime == 0 {
311                 maxruntime = disp.Cluster.Containers.LSF.MaxRunTimeDefault.Duration()
312         }
313         if maxruntime > 0 {
314                 maxruntime += disp.Cluster.Containers.LSF.MaxRunTimeOverhead.Duration()
315         }
316         maxrunminutes := int64(math.Ceil(float64(maxruntime.Seconds()) / 60))
317
318         repl := map[string]string{
319                 "%%": "%",
320                 "%C": fmt.Sprintf("%d", vcpus),
321                 "%M": fmt.Sprintf("%d", mem),
322                 "%T": fmt.Sprintf("%d", tmp),
323                 "%U": container.UUID,
324                 "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
325                 "%W": fmt.Sprintf("%d", maxrunminutes),
326         }
327
328         re := regexp.MustCompile(`%.`)
329         var substitutionErrors string
330         argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
331         if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
332                 argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
333         }
334         for idx, a := range argumentTemplate {
335                 if idx > 0 && (argumentTemplate[idx-1] == "-W" || argumentTemplate[idx-1] == "-We") && a == "%W" && maxrunminutes == 0 {
336                         // LSF docs don't specify an argument to "-W"
337                         // or "-We" that indicates "unknown", so
338                         // instead we drop the "-W %W" part of the
339                         // command line entirely when max runtime is
340                         // unknown.
341                         args = args[:len(args)-1]
342                         continue
343                 }
344                 args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
345                         subst := repl[s]
346                         if len(subst) == 0 {
347                                 substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
348                         }
349                         return subst
350                 }))
351         }
352         if len(substitutionErrors) != 0 {
353                 return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
354         }
355
356         if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
357                 args = append([]string{"sudo", "-E", "-u", u}, args...)
358         }
359         return args, nil
360 }
361
362 // Check the next bjobs report, and invoke TrackContainer for all the
363 // containers in the report. This gives us a chance to cancel existing
364 // Arvados LSF jobs (started by a previous dispatch process) that
365 // never released their LSF job allocations even though their
366 // container states are Cancelled or Complete. See
367 // https://dev.arvados.org/issues/10979
368 func (disp *dispatcher) checkLsfQueueForOrphans() {
369         containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
370         for _, uuid := range disp.lsfqueue.All() {
371                 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
372                         continue
373                 }
374                 err := disp.arvDispatcher.TrackContainer(uuid)
375                 if err != nil {
376                         disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
377                 }
378         }
379 }
380
381 func execScript(args []string, env map[string]string) []byte {
382         s := "#!/bin/sh\n"
383         for k, v := range env {
384                 s += k + `='`
385                 s += strings.Replace(v, `'`, `'\''`, -1)
386                 s += `' `
387         }
388         s += `exec`
389         for _, w := range args {
390                 s += ` '`
391                 s += strings.Replace(w, `'`, `'\''`, -1)
392                 s += `'`
393         }
394         return []byte(s + "\n")
395 }