17756: Review feedback.
[arvados.git] / lib / lsf / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "fmt"
12         "math"
13         "net/http"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cmd"
20         "git.arvados.org/arvados.git/lib/dispatchcloud"
21         "git.arvados.org/arvados.git/lib/service"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
24         "git.arvados.org/arvados.git/sdk/go/auth"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "git.arvados.org/arvados.git/sdk/go/dispatch"
27         "git.arvados.org/arvados.git/sdk/go/health"
28         "github.com/julienschmidt/httprouter"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/prometheus/client_golang/prometheus/promhttp"
31         "github.com/sirupsen/logrus"
32 )
33
34 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
35
36 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
37         ac, err := arvados.NewClientFromConfig(cluster)
38         if err != nil {
39                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
40         }
41         d := &dispatcher{
42                 Cluster:   cluster,
43                 Context:   ctx,
44                 ArvClient: ac,
45                 AuthToken: token,
46                 Registry:  reg,
47         }
48         go d.Start()
49         return d
50 }
51
52 type dispatcher struct {
53         Cluster   *arvados.Cluster
54         Context   context.Context
55         ArvClient *arvados.Client
56         AuthToken string
57         Registry  *prometheus.Registry
58
59         logger        logrus.FieldLogger
60         lsfcli        lsfcli
61         lsfqueue      lsfqueue
62         arvDispatcher *dispatch.Dispatcher
63         httpHandler   http.Handler
64
65         initOnce sync.Once
66         stop     chan struct{}
67         stopped  chan struct{}
68 }
69
70 // Start starts the dispatcher. Start can be called multiple times
71 // with no ill effect.
72 func (disp *dispatcher) Start() {
73         disp.initOnce.Do(func() {
74                 disp.init()
75                 go func() {
76                         disp.checkLsfQueueForOrphans()
77                         err := disp.arvDispatcher.Run(disp.Context)
78                         if err != nil {
79                                 disp.logger.Error(err)
80                                 disp.Close()
81                         }
82                 }()
83         })
84 }
85
86 // ServeHTTP implements service.Handler.
87 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
88         disp.Start()
89         disp.httpHandler.ServeHTTP(w, r)
90 }
91
92 // CheckHealth implements service.Handler.
93 func (disp *dispatcher) CheckHealth() error {
94         disp.Start()
95         select {
96         case <-disp.stopped:
97                 return errors.New("stopped")
98         default:
99                 return nil
100         }
101 }
102
103 // Done implements service.Handler.
104 func (disp *dispatcher) Done() <-chan struct{} {
105         return disp.stopped
106 }
107
108 // Stop dispatching containers and release resources. Used by tests.
109 func (disp *dispatcher) Close() {
110         disp.Start()
111         select {
112         case disp.stop <- struct{}{}:
113         default:
114         }
115         <-disp.stopped
116 }
117
118 func (disp *dispatcher) init() {
119         disp.logger = ctxlog.FromContext(disp.Context)
120         disp.lsfcli.logger = disp.logger
121         disp.lsfqueue = lsfqueue{
122                 logger: disp.logger,
123                 period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
124                 lsfcli: &disp.lsfcli,
125         }
126         disp.ArvClient.AuthToken = disp.AuthToken
127         disp.stop = make(chan struct{}, 1)
128         disp.stopped = make(chan struct{})
129
130         arv, err := arvadosclient.New(disp.ArvClient)
131         if err != nil {
132                 disp.logger.Fatalf("Error making Arvados client: %v", err)
133         }
134         arv.Retries = 25
135         arv.ApiToken = disp.AuthToken
136         disp.arvDispatcher = &dispatch.Dispatcher{
137                 Arv:            arv,
138                 Logger:         disp.logger,
139                 BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
140                 RunContainer:   disp.runContainer,
141                 PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
142                 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
143         }
144
145         if disp.Cluster.ManagementToken == "" {
146                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
147                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
148                 })
149         } else {
150                 mux := httprouter.New()
151                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
152                         ErrorLog: disp.logger,
153                 })
154                 mux.Handler("GET", "/metrics", metricsH)
155                 mux.Handler("GET", "/metrics.json", metricsH)
156                 mux.Handler("GET", "/_health/:check", &health.Handler{
157                         Token:  disp.Cluster.ManagementToken,
158                         Prefix: "/_health/",
159                         Routes: health.Routes{"ping": disp.CheckHealth},
160                 })
161                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
162         }
163 }
164
165 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
166         ctx, cancel := context.WithCancel(disp.Context)
167         defer cancel()
168
169         if ctr.State != dispatch.Locked {
170                 // already started by prior invocation
171         } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
172                 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
173                 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
174                 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
175                 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
176                 if err := disp.submit(ctr, cmd); err != nil {
177                         var text string
178                         switch err := err.(type) {
179                         case dispatchcloud.ConstraintsNotSatisfiableError:
180                                 var logBuf bytes.Buffer
181                                 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
182                                 if len(err.AvailableTypes) == 0 {
183                                         fmt.Fprint(&logBuf, "No instance types are configured.\n")
184                                 } else {
185                                         fmt.Fprint(&logBuf, "Available instance types:\n")
186                                         for _, t := range err.AvailableTypes {
187                                                 fmt.Fprintf(&logBuf,
188                                                         "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
189                                                         t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
190                                                 )
191                                         }
192                                 }
193                                 text = logBuf.String()
194                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
195                         default:
196                                 text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
197                         }
198                         disp.logger.Print(text)
199
200                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
201                                 "object_uuid": ctr.UUID,
202                                 "event_type":  "dispatch",
203                                 "properties":  map[string]string{"text": text}}}
204                         disp.arvDispatcher.Arv.Create("logs", lr, nil)
205
206                         disp.arvDispatcher.Unlock(ctr.UUID)
207                         return
208                 }
209         }
210
211         disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
212         defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
213
214         // If the container disappears from the lsf queue, there is
215         // no point in waiting for further dispatch updates: just
216         // clean up and return.
217         go func(uuid string) {
218                 for ctx.Err() == nil {
219                         if _, ok := disp.lsfqueue.JobID(uuid); !ok {
220                                 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
221                                 cancel()
222                                 return
223                         }
224                 }
225         }(ctr.UUID)
226
227         for done := false; !done; {
228                 select {
229                 case <-ctx.Done():
230                         // Disappeared from lsf queue
231                         if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
232                                 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
233                         }
234                         switch ctr.State {
235                         case dispatch.Running:
236                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
237                         case dispatch.Locked:
238                                 disp.arvDispatcher.Unlock(ctr.UUID)
239                         }
240                         return
241                 case updated, ok := <-status:
242                         if !ok {
243                                 // status channel is closed, which is
244                                 // how arvDispatcher tells us to stop
245                                 // touching the container record, kill
246                                 // off any remaining LSF processes,
247                                 // etc.
248                                 done = true
249                                 break
250                         }
251                         if updated.State != ctr.State {
252                                 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
253                         }
254                         ctr = updated
255                         if ctr.Priority < 1 {
256                                 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
257                                 disp.bkill(ctr)
258                         } else {
259                                 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
260                         }
261                 }
262         }
263         disp.logger.Printf("container %s is done", ctr.UUID)
264
265         // Try "bkill" every few seconds until the LSF job disappears
266         // from the queue.
267         ticker := time.NewTicker(5 * time.Second)
268         defer ticker.Stop()
269         for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
270                 err := disp.lsfcli.Bkill(jobid)
271                 if err != nil {
272                         disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
273                 }
274                 <-ticker.C
275         }
276 }
277
278 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
279         // Start with an empty slice here to ensure append() doesn't
280         // modify crunchRunCommand's underlying array
281         var crArgs []string
282         crArgs = append(crArgs, crunchRunCommand...)
283         crArgs = append(crArgs, container.UUID)
284         crScript := execScript(crArgs)
285
286         bsubArgs, err := disp.bsubArgs(container)
287         if err != nil {
288                 return err
289         }
290         return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
291 }
292
293 func (disp *dispatcher) bkill(ctr arvados.Container) {
294         if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
295                 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
296         } else if err := disp.lsfcli.Bkill(jobid); err != nil {
297                 disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
298         }
299 }
300
301 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
302         args := []string{"bsub"}
303         args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
304         args = append(args, "-J", container.UUID)
305         args = append(args, disp.bsubConstraintArgs(container)...)
306         if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
307                 args = append([]string{"sudo", "-E", "-u", u}, args...)
308         }
309         return args, nil
310 }
311
312 func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
313         // TODO: propagate container.SchedulingParameters.Partitions
314         tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
315         vcpus := container.RuntimeConstraints.VCPUs
316         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
317                 container.RuntimeConstraints.KeepCacheRAM+
318                 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
319         return []string{
320                 "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
321         }
322 }
323
324 // Check the next bjobs report, and invoke TrackContainer for all the
325 // containers in the report. This gives us a chance to cancel existing
326 // Arvados LSF jobs (started by a previous dispatch process) that
327 // never released their LSF job allocations even though their
328 // container states are Cancelled or Complete. See
329 // https://dev.arvados.org/issues/10979
330 func (disp *dispatcher) checkLsfQueueForOrphans() {
331         containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
332         for _, uuid := range disp.lsfqueue.All() {
333                 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
334                         continue
335                 }
336                 err := disp.arvDispatcher.TrackContainer(uuid)
337                 if err != nil {
338                         disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
339                 }
340         }
341 }
342
343 func execScript(args []string) []byte {
344         s := "#!/bin/sh\nexec"
345         for _, w := range args {
346                 s += ` '`
347                 s += strings.Replace(w, `'`, `'\''`, -1)
348                 s += `'`
349         }
350         return []byte(s + "\n")
351 }