// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

// Package scheduler uses a resizable worker pool to execute
// containers in priority order.
//
// Scheduler functions must not be called concurrently using the same
// queue or pool.
package scheduler

import (
	"sort"

	"git.curoverse.com/arvados.git/lib/cloud"
	"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
	"git.curoverse.com/arvados.git/sdk/go/arvados"
	"github.com/Sirupsen/logrus"
)

// Map maps queued containers onto unallocated workers in priority
// order, creating new workers if needed. It locks containers that can
// be mapped onto existing/pending workers, and starts them if
// possible.
//
// Map unlocks any containers that are locked but can't be
// mapped. (For example, this happens when the cloud provider reaches
// quota/capacity and a previously mappable container's priority is
// surpassed by a newer container.)
//
// If it encounters errors while creating new workers, Map shuts down
// idle workers, in case they are consuming quota.
//
// Map should not be called without first calling FixStaleLocks.
//
//	FixStaleLocks()
//	for {
//		Map()
//		Sync()
//	}
func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
	unsorted, _ := queue.Entries()
	sorted := make([]container.QueueEnt, 0, len(unsorted))
	for _, ent := range unsorted {
		sorted = append(sorted, ent)
	}
	sort.Slice(sorted, func(i, j int) bool {
		return sorted[i].Container.Priority > sorted[j].Container.Priority
	})

	running := pool.Running()
	unalloc := pool.Unallocated()

	logger.WithFields(logrus.Fields{
		"Containers": len(sorted),
		"Processes":  len(running),
	}).Debug("mapping")

	dontstart := map[arvados.InstanceType]bool{}
	var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota

	for i, ctr := range sorted {
		ctr, it := ctr.Container, ctr.InstanceType
		logger := logger.WithFields(logrus.Fields{
			"ContainerUUID": ctr.UUID,
			"InstanceType":  it.Name,
		})
		if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
			continue
		}
		if ctr.State == arvados.ContainerStateQueued {
			logger.Debugf("locking")
			if unalloc[it] < 1 && pool.AtQuota() {
				overquota = sorted[i:]
				break
			}
			err := queue.Lock(ctr.UUID)
			if err != nil {
				logger.WithError(err).Warnf("lock error")
				unalloc[it]++
				continue
			}
			var ok bool
			ctr, ok = queue.Get(ctr.UUID)
			if !ok {
				logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
				continue
			}
			if ctr.State != arvados.ContainerStateLocked {
				logger.Debugf("(race?) container has state=%q after Lock succeeded", ctr.State)
			}
		}
		if ctr.State != arvados.ContainerStateLocked {
			continue
		}
		if unalloc[it] < 1 {
			logger.Info("creating new instance")
			err := pool.Create(it)
			if err != nil {
				if _, ok := err.(cloud.QuotaError); !ok {
					logger.WithError(err).Warn("error creating worker")
				}
				queue.Unlock(ctr.UUID)
				// Don't let lower-priority containers
				// starve this one by using keeping
				// idle workers alive on different
				// instance types.  TODO: avoid
				// getting starved here if instances
				// of a specific type always fail.
				overquota = sorted[i:]
				break
			}
			unalloc[it]++
		}
		if dontstart[it] {
			// We already tried & failed to start a
			// higher-priority container on the same
			// instance type. Don't let this one sneak in
			// ahead of it.
		} else if pool.StartContainer(it, ctr) {
			unalloc[it]--
		} else {
			dontstart[it] = true
		}
	}

	if len(overquota) > 0 {
		// Unlock any containers that are unmappable while
		// we're at quota.
		for _, ctr := range overquota {
			ctr := ctr.Container
			if ctr.State == arvados.ContainerStateLocked {
				logger := logger.WithField("ContainerUUID", ctr.UUID)
				logger.Debug("unlock because pool capacity is used by higher priority containers")
				err := queue.Unlock(ctr.UUID)
				if err != nil {
					logger.WithError(err).Warn("error unlocking")
				}
			}
		}
		// Shut down idle workers that didn't get any
		// containers mapped onto them before we hit quota.
		for it, n := range unalloc {
			if n < 1 {
				continue
			}
			pool.Shutdown(it)
		}
	}
}