1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cmd"
20 "git.arvados.org/arvados.git/lib/dispatchcloud"
21 "git.arvados.org/arvados.git/lib/service"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
24 "git.arvados.org/arvados.git/sdk/go/auth"
25 "git.arvados.org/arvados.git/sdk/go/ctxlog"
26 "git.arvados.org/arvados.git/sdk/go/dispatch"
27 "git.arvados.org/arvados.git/sdk/go/health"
28 "github.com/julienschmidt/httprouter"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/prometheus/client_golang/prometheus/promhttp"
31 "github.com/sirupsen/logrus"
34 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
36 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
37 ac, err := arvados.NewClientFromConfig(cluster)
39 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
52 type dispatcher struct {
53 Cluster *arvados.Cluster
54 Context context.Context
55 ArvClient *arvados.Client
57 Registry *prometheus.Registry
59 logger logrus.FieldLogger
62 arvDispatcher *dispatch.Dispatcher
63 httpHandler http.Handler
70 // Start starts the dispatcher. Start can be called multiple times
71 // with no ill effect.
72 func (disp *dispatcher) Start() {
73 disp.initOnce.Do(func() {
76 disp.checkLsfQueueForOrphans()
77 err := disp.arvDispatcher.Run(disp.Context)
79 disp.logger.Error(err)
86 // ServeHTTP implements service.Handler.
87 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
89 disp.httpHandler.ServeHTTP(w, r)
92 // CheckHealth implements service.Handler.
93 func (disp *dispatcher) CheckHealth() error {
97 return errors.New("stopped")
103 // Done implements service.Handler.
104 func (disp *dispatcher) Done() <-chan struct{} {
108 // Stop dispatching containers and release resources. Used by tests.
109 func (disp *dispatcher) Close() {
112 case disp.stop <- struct{}{}:
118 func (disp *dispatcher) init() {
119 disp.logger = ctxlog.FromContext(disp.Context)
120 disp.lsfcli.logger = disp.logger
121 disp.lsfqueue = lsfqueue{
123 period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
124 lsfcli: &disp.lsfcli,
126 disp.ArvClient.AuthToken = disp.AuthToken
127 disp.stop = make(chan struct{}, 1)
128 disp.stopped = make(chan struct{})
130 arv, err := arvadosclient.New(disp.ArvClient)
132 disp.logger.Fatalf("Error making Arvados client: %v", err)
135 arv.ApiToken = disp.AuthToken
136 disp.arvDispatcher = &dispatch.Dispatcher{
139 BatchSize: disp.Cluster.API.MaxItemsPerResponse,
140 RunContainer: disp.runContainer,
141 PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
142 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
145 if disp.Cluster.ManagementToken == "" {
146 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
147 http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
150 mux := httprouter.New()
151 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
152 ErrorLog: disp.logger,
154 mux.Handler("GET", "/metrics", metricsH)
155 mux.Handler("GET", "/metrics.json", metricsH)
156 mux.Handler("GET", "/_health/:check", &health.Handler{
157 Token: disp.Cluster.ManagementToken,
159 Routes: health.Routes{"ping": disp.CheckHealth},
161 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
165 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
166 ctx, cancel := context.WithCancel(disp.Context)
169 if ctr.State != dispatch.Locked {
170 // already started by prior invocation
171 } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
172 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
173 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
174 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
175 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
176 if err := disp.submit(ctr, cmd); err != nil {
178 switch err := err.(type) {
179 case dispatchcloud.ConstraintsNotSatisfiableError:
180 var logBuf bytes.Buffer
181 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
182 if len(err.AvailableTypes) == 0 {
183 fmt.Fprint(&logBuf, "No instance types are configured.\n")
185 fmt.Fprint(&logBuf, "Available instance types:\n")
186 for _, t := range err.AvailableTypes {
188 "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
189 t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
193 text = logBuf.String()
194 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
196 text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
198 disp.logger.Print(text)
200 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
201 "object_uuid": ctr.UUID,
202 "event_type": "dispatch",
203 "properties": map[string]string{"text": text}}}
204 disp.arvDispatcher.Arv.Create("logs", lr, nil)
206 disp.arvDispatcher.Unlock(ctr.UUID)
211 disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
212 defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
214 // If the container disappears from the lsf queue, there is
215 // no point in waiting for further dispatch updates: just
216 // clean up and return.
217 go func(uuid string) {
218 for ctx.Err() == nil {
219 if _, ok := disp.lsfqueue.JobID(uuid); !ok {
220 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
227 for done := false; !done; {
230 // Disappeared from lsf queue
231 if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
232 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
235 case dispatch.Running:
236 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
237 case dispatch.Locked:
238 disp.arvDispatcher.Unlock(ctr.UUID)
241 case updated, ok := <-status:
243 // status channel is closed, which is
244 // how arvDispatcher tells us to stop
245 // touching the container record, kill
246 // off any remaining LSF processes,
251 if updated.State != ctr.State {
252 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
255 if ctr.Priority < 1 {
256 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
259 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
263 disp.logger.Printf("container %s is done", ctr.UUID)
265 // Try "bkill" every few seconds until the LSF job disappears
267 ticker := time.NewTicker(5 * time.Second)
269 for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
270 err := disp.lsfcli.Bkill(jobid)
272 disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
278 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
279 // Start with an empty slice here to ensure append() doesn't
280 // modify crunchRunCommand's underlying array
282 crArgs = append(crArgs, crunchRunCommand...)
283 crArgs = append(crArgs, container.UUID)
284 crScript := execScript(crArgs)
286 bsubArgs, err := disp.bsubArgs(container)
290 return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
293 func (disp *dispatcher) bkill(ctr arvados.Container) {
294 if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
295 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
296 } else if err := disp.lsfcli.Bkill(jobid); err != nil {
297 disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
301 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
302 args := []string{"bsub"}
303 args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
304 args = append(args, "-J", container.UUID)
305 args = append(args, disp.bsubConstraintArgs(container)...)
306 if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
307 args = append([]string{"sudo", "-E", "-u", u}, args...)
312 func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
313 // TODO: propagate container.SchedulingParameters.Partitions
314 tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
315 vcpus := container.RuntimeConstraints.VCPUs
316 mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
317 container.RuntimeConstraints.KeepCacheRAM+
318 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
320 "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
324 // Check the next bjobs report, and invoke TrackContainer for all the
325 // containers in the report. This gives us a chance to cancel existing
326 // Arvados LSF jobs (started by a previous dispatch process) that
327 // never released their LSF job allocations even though their
328 // container states are Cancelled or Complete. See
329 // https://dev.arvados.org/issues/10979
330 func (disp *dispatcher) checkLsfQueueForOrphans() {
331 containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
332 for _, uuid := range disp.lsfqueue.All() {
333 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
336 err := disp.arvDispatcher.TrackContainer(uuid)
338 disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
343 func execScript(args []string) []byte {
344 s := "#!/bin/sh\nexec"
345 for _, w := range args {
347 s += strings.Replace(w, `'`, `'\''`, -1)
350 return []byte(s + "\n")