15370: Re-enable docker tests.
[arvados.git] / lib / lsf / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "errors"
10         "fmt"
11         "math"
12         "net/http"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/lib/dispatchcloud"
20         "git.arvados.org/arvados.git/lib/service"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
23         "git.arvados.org/arvados.git/sdk/go/auth"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/dispatch"
26         "git.arvados.org/arvados.git/sdk/go/health"
27         "github.com/julienschmidt/httprouter"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/prometheus/client_golang/prometheus/promhttp"
30         "github.com/sirupsen/logrus"
31 )
32
33 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
34
35 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
36         ac, err := arvados.NewClientFromConfig(cluster)
37         if err != nil {
38                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
39         }
40         d := &dispatcher{
41                 Cluster:   cluster,
42                 Context:   ctx,
43                 ArvClient: ac,
44                 AuthToken: token,
45                 Registry:  reg,
46         }
47         go d.Start()
48         return d
49 }
50
51 type dispatcher struct {
52         Cluster   *arvados.Cluster
53         Context   context.Context
54         ArvClient *arvados.Client
55         AuthToken string
56         Registry  *prometheus.Registry
57
58         logger        logrus.FieldLogger
59         lsfcli        lsfcli
60         lsfqueue      lsfqueue
61         arvDispatcher *dispatch.Dispatcher
62         httpHandler   http.Handler
63
64         initOnce sync.Once
65         stop     chan struct{}
66         stopped  chan struct{}
67 }
68
69 // Start starts the dispatcher. Start can be called multiple times
70 // with no ill effect.
71 func (disp *dispatcher) Start() {
72         disp.initOnce.Do(func() {
73                 disp.init()
74                 go func() {
75                         disp.checkLsfQueueForOrphans()
76                         err := disp.arvDispatcher.Run(disp.Context)
77                         if err != nil {
78                                 disp.logger.Error(err)
79                                 disp.Close()
80                         }
81                 }()
82         })
83 }
84
85 // ServeHTTP implements service.Handler.
86 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
87         disp.Start()
88         disp.httpHandler.ServeHTTP(w, r)
89 }
90
91 // CheckHealth implements service.Handler.
92 func (disp *dispatcher) CheckHealth() error {
93         disp.Start()
94         select {
95         case <-disp.stopped:
96                 return errors.New("stopped")
97         default:
98                 return nil
99         }
100 }
101
102 // Done implements service.Handler.
103 func (disp *dispatcher) Done() <-chan struct{} {
104         return disp.stopped
105 }
106
107 // Stop dispatching containers and release resources. Used by tests.
108 func (disp *dispatcher) Close() {
109         disp.Start()
110         select {
111         case disp.stop <- struct{}{}:
112         default:
113         }
114         <-disp.stopped
115 }
116
117 func (disp *dispatcher) init() {
118         disp.logger = ctxlog.FromContext(disp.Context)
119         disp.lsfcli.logger = disp.logger
120         disp.lsfqueue = lsfqueue{
121                 logger: disp.logger,
122                 period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
123                 lsfcli: &disp.lsfcli,
124         }
125         disp.ArvClient.AuthToken = disp.AuthToken
126         disp.stop = make(chan struct{}, 1)
127         disp.stopped = make(chan struct{})
128
129         arv, err := arvadosclient.New(disp.ArvClient)
130         if err != nil {
131                 disp.logger.Fatalf("Error making Arvados client: %v", err)
132         }
133         arv.Retries = 25
134         arv.ApiToken = disp.AuthToken
135         disp.arvDispatcher = &dispatch.Dispatcher{
136                 Arv:            arv,
137                 Logger:         disp.logger,
138                 BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
139                 RunContainer:   disp.runContainer,
140                 PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
141                 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
142         }
143
144         if disp.Cluster.ManagementToken == "" {
145                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
146                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
147                 })
148         } else {
149                 mux := httprouter.New()
150                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
151                         ErrorLog: disp.logger,
152                 })
153                 mux.Handler("GET", "/metrics", metricsH)
154                 mux.Handler("GET", "/metrics.json", metricsH)
155                 mux.Handler("GET", "/_health/:check", &health.Handler{
156                         Token:  disp.Cluster.ManagementToken,
157                         Prefix: "/_health/",
158                         Routes: health.Routes{"ping": disp.CheckHealth},
159                 })
160                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
161         }
162 }
163
164 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
165         ctx, cancel := context.WithCancel(disp.Context)
166         defer cancel()
167
168         if ctr.State != dispatch.Locked {
169                 // already started by prior invocation
170         } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
171                 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
172                 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
173                 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
174                 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
175                 err := disp.submit(ctr, cmd)
176                 if err != nil {
177                         return err
178                 }
179         }
180
181         disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
182         defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
183
184         go func(uuid string) {
185                 cancelled := false
186                 for ctx.Err() == nil {
187                         qent, ok := disp.lsfqueue.Lookup(uuid)
188                         if !ok {
189                                 // If the container disappears from
190                                 // the lsf queue, there is no point in
191                                 // waiting for further dispatch
192                                 // updates: just clean up and return.
193                                 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
194                                 cancel()
195                                 return
196                         }
197                         if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
198                                 disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
199                                 err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
200                                         "container": map[string]interface{}{
201                                                 "runtime_status": map[string]string{
202                                                         "error": qent.PendReason,
203                                                 },
204                                         },
205                                 }, nil)
206                                 if err != nil {
207                                         disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
208                                         continue // retry
209                                 }
210                                 err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
211                                 if err != nil {
212                                         continue // retry (UpdateState() already logged the error)
213                                 }
214                                 cancelled = true
215                         }
216                 }
217         }(ctr.UUID)
218
219         for done := false; !done; {
220                 select {
221                 case <-ctx.Done():
222                         // Disappeared from lsf queue
223                         if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
224                                 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
225                         }
226                         switch ctr.State {
227                         case dispatch.Running:
228                                 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
229                         case dispatch.Locked:
230                                 disp.arvDispatcher.Unlock(ctr.UUID)
231                         }
232                         return nil
233                 case updated, ok := <-status:
234                         if !ok {
235                                 // status channel is closed, which is
236                                 // how arvDispatcher tells us to stop
237                                 // touching the container record, kill
238                                 // off any remaining LSF processes,
239                                 // etc.
240                                 done = true
241                                 break
242                         }
243                         if updated.State != ctr.State {
244                                 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
245                         }
246                         ctr = updated
247                         if ctr.Priority < 1 {
248                                 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
249                                 disp.bkill(ctr)
250                         } else {
251                                 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
252                         }
253                 }
254         }
255         disp.logger.Printf("container %s is done", ctr.UUID)
256
257         // Try "bkill" every few seconds until the LSF job disappears
258         // from the queue.
259         ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
260         defer ticker.Stop()
261         for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
262                 err := disp.lsfcli.Bkill(qent.ID)
263                 if err != nil {
264                         disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
265                 }
266                 <-ticker.C
267         }
268         return nil
269 }
270
271 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
272         // Start with an empty slice here to ensure append() doesn't
273         // modify crunchRunCommand's underlying array
274         var crArgs []string
275         crArgs = append(crArgs, crunchRunCommand...)
276         crArgs = append(crArgs, container.UUID)
277         crScript := execScript(crArgs)
278
279         bsubArgs, err := disp.bsubArgs(container)
280         if err != nil {
281                 return err
282         }
283         return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
284 }
285
286 func (disp *dispatcher) bkill(ctr arvados.Container) {
287         if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
288                 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
289         } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
290                 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
291         }
292 }
293
294 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
295         args := []string{"bsub"}
296
297         tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
298         vcpus := container.RuntimeConstraints.VCPUs
299         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
300                 container.RuntimeConstraints.KeepCacheRAM+
301                 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
302
303         repl := map[string]string{
304                 "%%": "%",
305                 "%C": fmt.Sprintf("%d", vcpus),
306                 "%M": fmt.Sprintf("%d", mem),
307                 "%T": fmt.Sprintf("%d", tmp),
308                 "%U": container.UUID,
309                 "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
310         }
311
312         re := regexp.MustCompile(`%.`)
313         var substitutionErrors string
314         argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
315         if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
316                 argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
317         }
318         for _, a := range argumentTemplate {
319                 args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
320                         subst := repl[s]
321                         if len(subst) == 0 {
322                                 substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
323                         }
324                         return subst
325                 }))
326         }
327         if len(substitutionErrors) != 0 {
328                 return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
329         }
330
331         if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
332                 args = append([]string{"sudo", "-E", "-u", u}, args...)
333         }
334         return args, nil
335 }
336
337 // Check the next bjobs report, and invoke TrackContainer for all the
338 // containers in the report. This gives us a chance to cancel existing
339 // Arvados LSF jobs (started by a previous dispatch process) that
340 // never released their LSF job allocations even though their
341 // container states are Cancelled or Complete. See
342 // https://dev.arvados.org/issues/10979
343 func (disp *dispatcher) checkLsfQueueForOrphans() {
344         containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
345         for _, uuid := range disp.lsfqueue.All() {
346                 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
347                         continue
348                 }
349                 err := disp.arvDispatcher.TrackContainer(uuid)
350                 if err != nil {
351                         disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
352                 }
353         }
354 }
355
356 func execScript(args []string) []byte {
357         s := "#!/bin/sh\nexec"
358         for _, w := range args {
359                 s += ` '`
360                 s += strings.Replace(w, `'`, `'\''`, -1)
361                 s += `'`
362         }
363         return []byte(s + "\n")
364 }