17756: Add checkLsfQueueForOrphans.
[arvados.git] / lib / lsf / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "fmt"
12         "math"
13         "net/http"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cmd"
20         "git.arvados.org/arvados.git/lib/dispatchcloud"
21         "git.arvados.org/arvados.git/lib/service"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
24         "git.arvados.org/arvados.git/sdk/go/auth"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "git.arvados.org/arvados.git/sdk/go/dispatch"
27         "git.arvados.org/arvados.git/sdk/go/health"
28         "github.com/julienschmidt/httprouter"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/prometheus/client_golang/prometheus/promhttp"
31         "github.com/sirupsen/logrus"
32 )
33
34 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
35
36 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
37         ac, err := arvados.NewClientFromConfig(cluster)
38         if err != nil {
39                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
40         }
41         d := &dispatcher{
42                 Cluster:   cluster,
43                 Context:   ctx,
44                 ArvClient: ac,
45                 AuthToken: token,
46                 Registry:  reg,
47         }
48         go d.Start()
49         return d
50 }
51
52 type dispatcher struct {
53         Cluster   *arvados.Cluster
54         Context   context.Context
55         ArvClient *arvados.Client
56         AuthToken string
57         Registry  *prometheus.Registry
58
59         logger        logrus.FieldLogger
60         lsfcli        lsfcli
61         lsfqueue      lsfqueue
62         arvDispatcher *dispatch.Dispatcher
63         httpHandler   http.Handler
64
65         initOnce sync.Once
66         stop     chan struct{}
67         stopped  chan struct{}
68 }
69
70 // Start starts the dispatcher. Start can be called multiple times
71 // with no ill effect.
72 func (disp *dispatcher) Start() {
73         disp.initOnce.Do(func() {
74                 disp.init()
75                 go func() {
76                         disp.checkLsfQueueForOrphans()
77                         err := disp.arvDispatcher.Run(disp.Context)
78                         if err != nil {
79                                 disp.logger.Error(err)
80                                 disp.Close()
81                         }
82                 }()
83         })
84 }
85
86 // ServeHTTP implements service.Handler.
87 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
88         disp.Start()
89         disp.httpHandler.ServeHTTP(w, r)
90 }
91
92 // CheckHealth implements service.Handler.
93 func (disp *dispatcher) CheckHealth() error {
94         disp.Start()
95         select {
96         case <-disp.stopped:
97                 return errors.New("stopped")
98         default:
99                 return nil
100         }
101 }
102
103 // Done implements service.Handler.
104 func (disp *dispatcher) Done() <-chan struct{} {
105         return disp.stopped
106 }
107
108 // Stop dispatching containers and release resources. Used by tests.
109 func (disp *dispatcher) Close() {
110         disp.Start()
111         select {
112         case disp.stop <- struct{}{}:
113         default:
114         }
115         <-disp.stopped
116 }
117
118 func (disp *dispatcher) init() {
119         disp.logger = ctxlog.FromContext(disp.Context)
120         disp.lsfcli.logger = disp.logger
121         disp.lsfqueue = lsfqueue{
122                 logger: disp.logger,
123                 period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
124                 lsfcli: &disp.lsfcli,
125         }
126         disp.ArvClient.AuthToken = disp.AuthToken
127         disp.stop = make(chan struct{}, 1)
128         disp.stopped = make(chan struct{})
129
130         arv, err := arvadosclient.New(disp.ArvClient)
131         if err != nil {
132                 disp.logger.Fatalf("Error making Arvados client: %v", err)
133         }
134         arv.Retries = 25
135         arv.ApiToken = disp.AuthToken
136         disp.arvDispatcher = &dispatch.Dispatcher{
137                 Arv:            arv,
138                 Logger:         disp.logger,
139                 BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
140                 RunContainer:   disp.runContainer,
141                 PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
142                 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
143         }
144
145         if disp.Cluster.ManagementToken == "" {
146                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
147                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
148                 })
149         } else {
150                 mux := httprouter.New()
151                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
152                         ErrorLog: disp.logger,
153                 })
154                 mux.Handler("GET", "/metrics", metricsH)
155                 mux.Handler("GET", "/metrics.json", metricsH)
156                 mux.Handler("GET", "/_health/:check", &health.Handler{
157                         Token:  disp.Cluster.ManagementToken,
158                         Prefix: "/_health/",
159                         Routes: health.Routes{"ping": disp.CheckHealth},
160                 })
161                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
162         }
163 }
164
165 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
166         ctx, cancel := context.WithCancel(disp.Context)
167         defer cancel()
168
169         if ctr.State != dispatch.Locked {
170                 // already started by prior invocation
171         } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
172                 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
173                 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
174                 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
175                 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
176                 if err := disp.submit(ctr, cmd); err != nil {
177                         var text string
178                         switch err := err.(type) {
179                         case dispatchcloud.ConstraintsNotSatisfiableError:
180                                 var logBuf bytes.Buffer
181                                 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
182                                 if len(err.AvailableTypes) == 0 {
183                                         fmt.Fprint(&logBuf, "No instance types are configured.\n")
184                                 } else {
185                                         fmt.Fprint(&logBuf, "Available instance types:\n")
186                                         for _, t := range err.AvailableTypes {
187                                                 fmt.Fprintf(&logBuf,
188                                                         "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
189                                                         t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
190                                                 )
191                                         }
192                                 }
193                                 text = logBuf.String()
194                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
195                         default:
196                                 text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
197                         }
198                         disp.logger.Print(text)
199
200                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
201                                 "object_uuid": ctr.UUID,
202                                 "event_type":  "dispatch",
203                                 "properties":  map[string]string{"text": text}}}
204                         disp.arvDispatcher.Arv.Create("logs", lr, nil)
205
206                         disp.arvDispatcher.Unlock(ctr.UUID)
207                         return
208                 }
209         }
210
211         disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
212         defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
213
214         // If the container disappears from the lsf queue, there is
215         // no point in waiting for further dispatch updates: just
216         // clean up and return.
217         go func(uuid string) {
218                 for ctx.Err() == nil {
219                         if _, ok := disp.lsfqueue.JobID(uuid); !ok {
220                                 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
221                                 cancel()
222                                 return
223                         }
224                 }
225         }(ctr.UUID)
226
227         for done := false; !done; {
228                 select {
229                 case <-ctx.Done():
230                         // Disappeared from lsf queue
231                         if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
232                                 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
233                         }
234                         switch ctr.State {
235                         case dispatch.Running:
236                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
237                         case dispatch.Locked:
238                                 disp.arvDispatcher.Unlock(ctr.UUID)
239                         }
240                         return
241                 case updated, ok := <-status:
242                         if !ok {
243                                 done = true
244                                 break
245                         }
246                         if updated.State != ctr.State {
247                                 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
248                         }
249                         ctr = updated
250                         if ctr.Priority == 0 {
251                                 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
252                                 disp.bkill(ctr)
253                         } else {
254                                 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
255                         }
256                 }
257         }
258         disp.logger.Printf("container %s is done", ctr.UUID)
259
260         // Try "bkill" every few seconds until the LSF job disappears
261         // from the queue.
262         ticker := time.NewTicker(5 * time.Second)
263         defer ticker.Stop()
264         for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
265                 err := disp.lsfcli.Bkill(jobid)
266                 if err != nil {
267                         disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
268                 }
269                 <-ticker.C
270         }
271 }
272
273 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
274         // Start with an empty slice here to ensure append() doesn't
275         // modify crunchRunCommand's underlying array
276         var crArgs []string
277         crArgs = append(crArgs, crunchRunCommand...)
278         crArgs = append(crArgs, container.UUID)
279         crScript := execScript(crArgs)
280
281         bsubArgs, err := disp.bsubArgs(container)
282         if err != nil {
283                 return err
284         }
285         return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
286 }
287
288 func (disp *dispatcher) bkill(ctr arvados.Container) {
289         if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
290                 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
291         } else if err := disp.lsfcli.Bkill(jobid); err != nil {
292                 disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
293         }
294 }
295
296 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
297         args := []string{"bsub"}
298         args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
299         args = append(args, "-J", container.UUID)
300         args = append(args, disp.bsubConstraintArgs(container)...)
301         if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
302                 args = append([]string{"sudo", "-E", "-u", u}, args...)
303         }
304         return args, nil
305 }
306
307 func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
308         // TODO: propagate container.SchedulingParameters.Partitions
309         tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
310         vcpus := container.RuntimeConstraints.VCPUs
311         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
312                 container.RuntimeConstraints.KeepCacheRAM+
313                 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
314         return []string{
315                 "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
316         }
317 }
318
319 // Check the next bjobs report, and invoke TrackContainer for all the
320 // containers in the report. This gives us a chance to cancel existing
321 // Arvados LSF jobs (started by a previous dispatch process) that
322 // never released their LSF job allocations even though their
323 // container states are Cancelled or Complete. See
324 // https://dev.arvados.org/issues/10979
325 func (disp *dispatcher) checkLsfQueueForOrphans() {
326         containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
327         for _, uuid := range disp.lsfqueue.All() {
328                 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
329                         continue
330                 }
331                 err := disp.arvDispatcher.TrackContainer(uuid)
332                 if err != nil {
333                         disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
334                 }
335         }
336 }
337
338 func execScript(args []string) []byte {
339         s := "#!/bin/sh\nexec"
340         for _, w := range args {
341                 s += ` '`
342                 s += strings.Replace(w, `'`, `'\''`, -1)
343                 s += `'`
344         }
345         return []byte(s + "\n")
346 }