1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cmd"
21 "git.arvados.org/arvados.git/lib/dispatchcloud"
22 "git.arvados.org/arvados.git/lib/service"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25 "git.arvados.org/arvados.git/sdk/go/auth"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "git.arvados.org/arvados.git/sdk/go/dispatch"
28 "git.arvados.org/arvados.git/sdk/go/health"
29 "github.com/julienschmidt/httprouter"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/prometheus/client_golang/prometheus/promhttp"
32 "github.com/sirupsen/logrus"
35 var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
37 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
38 ac, err := arvados.NewClientFromConfig(cluster)
40 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
53 type dispatcher struct {
54 Cluster *arvados.Cluster
55 Context context.Context
56 ArvClient *arvados.Client
58 Registry *prometheus.Registry
60 logger logrus.FieldLogger
63 arvDispatcher *dispatch.Dispatcher
64 httpHandler http.Handler
71 // Start starts the dispatcher. Start can be called multiple times
72 // with no ill effect.
73 func (disp *dispatcher) Start() {
74 disp.initOnce.Do(func() {
77 disp.checkLsfQueueForOrphans()
78 err := disp.arvDispatcher.Run(disp.Context)
80 disp.logger.Error(err)
87 // ServeHTTP implements service.Handler.
88 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
90 disp.httpHandler.ServeHTTP(w, r)
93 // CheckHealth implements service.Handler.
94 func (disp *dispatcher) CheckHealth() error {
98 return errors.New("stopped")
104 // Done implements service.Handler.
105 func (disp *dispatcher) Done() <-chan struct{} {
109 // Stop dispatching containers and release resources. Used by tests.
110 func (disp *dispatcher) Close() {
113 case disp.stop <- struct{}{}:
119 func (disp *dispatcher) init() {
120 disp.logger = ctxlog.FromContext(disp.Context)
121 disp.lsfcli.logger = disp.logger
122 disp.lsfqueue = lsfqueue{
124 period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
125 lsfcli: &disp.lsfcli,
127 disp.ArvClient.AuthToken = disp.AuthToken
128 disp.stop = make(chan struct{}, 1)
129 disp.stopped = make(chan struct{})
131 arv, err := arvadosclient.New(disp.ArvClient)
133 disp.logger.Fatalf("Error making Arvados client: %v", err)
136 arv.ApiToken = disp.AuthToken
137 disp.arvDispatcher = &dispatch.Dispatcher{
140 BatchSize: disp.Cluster.API.MaxItemsPerResponse,
141 RunContainer: disp.runContainer,
142 PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
143 MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
146 if disp.Cluster.ManagementToken == "" {
147 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148 http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
151 mux := httprouter.New()
152 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
153 ErrorLog: disp.logger,
155 mux.Handler("GET", "/metrics", metricsH)
156 mux.Handler("GET", "/metrics.json", metricsH)
157 mux.Handler("GET", "/_health/:check", &health.Handler{
158 Token: disp.Cluster.ManagementToken,
160 Routes: health.Routes{"ping": disp.CheckHealth},
162 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
166 func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
167 ctx, cancel := context.WithCancel(disp.Context)
170 if ctr.State != dispatch.Locked {
171 // already started by prior invocation
172 } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
173 disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
174 cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
175 cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
176 cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
177 err := disp.submit(ctr, cmd)
183 disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
184 defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
186 go func(uuid string) {
188 for ctx.Err() == nil {
189 qent, ok := disp.lsfqueue.Lookup(uuid)
191 // If the container disappears from
192 // the lsf queue, there is no point in
193 // waiting for further dispatch
194 // updates: just clean up and return.
195 disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
199 if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
200 disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
201 err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
202 "container": map[string]interface{}{
203 "runtime_status": map[string]string{
204 "error": qent.PendReason,
209 disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
212 err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
214 continue // retry (UpdateState() already logged the error)
221 for done := false; !done; {
224 // Disappeared from lsf queue
225 if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
226 disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
229 case dispatch.Running:
230 disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
231 case dispatch.Locked:
232 disp.arvDispatcher.Unlock(ctr.UUID)
235 case updated, ok := <-status:
237 // status channel is closed, which is
238 // how arvDispatcher tells us to stop
239 // touching the container record, kill
240 // off any remaining LSF processes,
245 if updated.State != ctr.State {
246 disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
249 if ctr.Priority < 1 {
250 disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
253 disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
257 disp.logger.Printf("container %s is done", ctr.UUID)
259 // Try "bkill" every few seconds until the LSF job disappears
261 ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
263 for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
264 err := disp.lsfcli.Bkill(qent.ID)
266 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
273 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
274 // Start with an empty slice here to ensure append() doesn't
275 // modify crunchRunCommand's underlying array
277 crArgs = append(crArgs, crunchRunCommand...)
278 crArgs = append(crArgs, container.UUID)
280 h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
281 fmt.Fprint(h, container.UUID)
282 authsecret := fmt.Sprintf("%x", h.Sum(nil))
284 crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
286 bsubArgs, err := disp.bsubArgs(container)
290 return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
293 func (disp *dispatcher) bkill(ctr arvados.Container) {
294 if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
295 disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
296 } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
297 disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
301 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
302 args := []string{"bsub"}
304 tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
305 vcpus := container.RuntimeConstraints.VCPUs
306 mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
307 container.RuntimeConstraints.KeepCacheRAM+
308 int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
310 repl := map[string]string{
312 "%C": fmt.Sprintf("%d", vcpus),
313 "%M": fmt.Sprintf("%d", mem),
314 "%T": fmt.Sprintf("%d", tmp),
315 "%U": container.UUID,
316 "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
319 re := regexp.MustCompile(`%.`)
320 var substitutionErrors string
321 argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
322 if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
323 argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
325 for _, a := range argumentTemplate {
326 args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
329 substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
334 if len(substitutionErrors) != 0 {
335 return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
338 if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
339 args = append([]string{"sudo", "-E", "-u", u}, args...)
344 // Check the next bjobs report, and invoke TrackContainer for all the
345 // containers in the report. This gives us a chance to cancel existing
346 // Arvados LSF jobs (started by a previous dispatch process) that
347 // never released their LSF job allocations even though their
348 // container states are Cancelled or Complete. See
349 // https://dev.arvados.org/issues/10979
350 func (disp *dispatcher) checkLsfQueueForOrphans() {
351 containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
352 for _, uuid := range disp.lsfqueue.All() {
353 if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
356 err := disp.arvDispatcher.TrackContainer(uuid)
358 disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
363 func execScript(args []string, env map[string]string) []byte {
365 for k, v := range env {
367 s += strings.Replace(v, `'`, `'\''`, -1)
371 for _, w := range args {
373 s += strings.Replace(w, `'`, `'\''`, -1)
376 return []byte(s + "\n")