/* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */

package main

import (
	"errors"
	"flag"
	"fmt"
	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
	"git.curoverse.com/arvados.git/sdk/go/keepclient"
	"git.curoverse.com/arvados.git/sdk/go/logger"
	"git.curoverse.com/arvados.git/sdk/go/util"
	"git.curoverse.com/arvados.git/services/datamanager/collection"
	"git.curoverse.com/arvados.git/services/datamanager/keep"
	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
	"git.curoverse.com/arvados.git/services/datamanager/summary"
	"log"
	"time"
)

var (
	logEventTypePrefix  string
	logFrequencySeconds int
	minutesBetweenRuns  int
	collectionBatchSize int
	dryRun              bool
)

func init() {
	flag.StringVar(&logEventTypePrefix,
		"log-event-type-prefix",
		"experimental-data-manager",
		"Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
	flag.IntVar(&logFrequencySeconds,
		"log-frequency-seconds",
		20,
		"How frequently we'll write log entries in seconds.")
	flag.IntVar(&minutesBetweenRuns,
		"minutes-between-runs",
		0,
		"How many minutes we wait between data manager runs. 0 means run once and exit.")
	flag.IntVar(&collectionBatchSize,
		"collection-batch-size",
		1000,
		"How many collections to request in each batch.")
	flag.BoolVar(&dryRun,
		"dry-run",
		false,
		"Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
}

func main() {
	flag.Parse()

	if minutesBetweenRuns == 0 {
		arv, err := arvadosclient.MakeArvadosClient()
		if err != nil {
			loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
		}
		err = singlerun(arv)
		if err != nil {
			loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
		}
	} else {
		waitTime := time.Minute * time.Duration(minutesBetweenRuns)
		for {
			log.Println("Beginning Run")
			arv, err := arvadosclient.MakeArvadosClient()
			if err != nil {
				loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
			}
			err = singlerun(arv)
			if err != nil {
				log.Printf("singlerun: %v", err)
			}
			log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
			time.Sleep(waitTime)
		}
	}
}

var arvLogger *logger.Logger

func singlerun(arv arvadosclient.ArvadosClient) error {
	var err error
	if isAdmin, err := util.UserIsAdmin(arv); err != nil {
		return errors.New("Error verifying admin token: " + err.Error())
	} else if !isAdmin {
		return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
	}

	if logEventTypePrefix != "" {
		arvLogger, err = logger.NewLogger(logger.LoggerParams{
			Client:          arv,
			EventTypePrefix: logEventTypePrefix,
			WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
	}

	loggerutil.LogRunInfo(arvLogger)
	if arvLogger != nil {
		arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
	}

	var (
		dataFetcher     summary.DataFetcher
		readCollections collection.ReadCollections
		keepServerInfo  keep.ReadServers
	)

	if summary.ShouldReadData() {
		dataFetcher = summary.ReadData
	} else {
		dataFetcher = BuildDataFetcher(arv)
	}

	err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
	if err != nil {
		return err
	}

	err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
	if err != nil {
		return err
	}

	buckets := summary.BucketReplication(readCollections, keepServerInfo)
	bucketCounts := buckets.Counts()

	replicationSummary := buckets.SummarizeBuckets(readCollections)
	replicationCounts := replicationSummary.ComputeCounts()

	log.Printf("Blocks In Collections: %d, "+
		"\nBlocks In Keep: %d.",
		len(readCollections.BlockToDesiredReplication),
		len(keepServerInfo.BlockToServers))
	log.Println(replicationCounts.PrettyPrint())

	log.Printf("Blocks Histogram:")
	for _, rlbss := range bucketCounts {
		log.Printf("%+v: %10d",
			rlbss.Levels,
			rlbss.Count)
	}

	kc, err := keepclient.MakeKeepClient(&arv)
	if err != nil {
		return fmt.Errorf("Error setting up keep client %v", err.Error())
	}

	// Log that we're finished. We force the recording, since go will
	// not wait for the write timer before exiting.
	if arvLogger != nil {
		defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
			summaryInfo := logger.GetOrCreateMap(p, "summary_info")
			summaryInfo["block_replication_counts"] = bucketCounts
			summaryInfo["replication_summary"] = replicationCounts
			p["summary_info"] = summaryInfo

			p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
		})
	}

	pullServers := summary.ComputePullServers(kc,
		&keepServerInfo,
		readCollections.BlockToDesiredReplication,
		replicationSummary.UnderReplicatedBlocks)

	pullLists := summary.BuildPullLists(pullServers)

	trashLists, trashErr := summary.BuildTrashLists(kc,
		&keepServerInfo,
		replicationSummary.KeepBlocksNotInCollections)

	err = summary.WritePullLists(arvLogger, pullLists, dryRun)
	if err != nil {
		return err
	}

	if trashErr != nil {
		return err
	}
	keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)

	return nil
}

// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
	return func(
		arvLogger *logger.Logger,
		readCollections *collection.ReadCollections,
		keepServerInfo *keep.ReadServers,
	) error {
		collDone := make(chan struct{})
		var collErr error
		go func() {
			*readCollections, collErr = collection.GetCollectionsAndSummarize(
				collection.GetCollectionsParams{
					Client:    arv,
					Logger:    arvLogger,
					BatchSize: collectionBatchSize})
			collDone <- struct{}{}
		}()

		var keepErr error
		*keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
			keep.GetKeepServersParams{
				Client: arv,
				Logger: arvLogger,
				Limit:  1000})

		<-collDone

		// Return a nil error only if both parts succeeded.
		if collErr != nil {
			return collErr
		}
		return keepErr
	}
}