// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package scheduler

import (
	"time"

	"git.arvados.org/arvados.git/lib/dispatchcloud/worker"
	"git.arvados.org/arvados.git/sdk/go/arvados"
)

// fixStaleLocks waits for any already-locked containers (i.e., locked
// by a prior dispatcher process) to appear on workers as the worker
// pool recovers its state. It unlocks any that still remain when all
// workers are recovered or shutdown, or its timer
// (sch.staleLockTimeout) expires.
func (sch *Scheduler) fixStaleLocks() {
	wp := sch.pool.Subscribe()
	defer sch.pool.Unsubscribe(wp)

	var stale []string
	timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
	for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
		running := sch.pool.Running()
		qEntries, _ := sch.queue.Entries()

		stale = nil
		for uuid, ent := range qEntries {
			if ent.Container.State != arvados.ContainerStateLocked {
				continue
			}
			if _, running := running[uuid]; running {
				continue
			}
			stale = append(stale, uuid)
		}
		if len(stale) == 0 {
			return
		}

		select {
		case <-wp:
		case <-timeout.C:
			// Give up.
			break waiting
		}
	}

	for _, uuid := range stale {
		err := sch.queue.Unlock(uuid)
		if err != nil {
			sch.logger.Warnf("Unlock %s: %s", uuid, err)
		}
	}
}