// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package recovercollection

import (
	"context"
	"errors"
	"flag"
	"fmt"
	"io"
	"io/ioutil"
	"strings"
	"sync"
	"time"

	"git.arvados.org/arvados.git/lib/cmd"
	"git.arvados.org/arvados.git/lib/config"
	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/ctxlog"
	"github.com/sirupsen/logrus"
)

var Command command

type command struct{}

func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
	var err error
	logger := ctxlog.New(stderr, "text", "info")
	defer func() {
		if err != nil {
			logger.WithError(err).Error("fatal")
		}
		logger.Info("exiting")
	}()

	loader := config.NewLoader(stdin, logger)
	loader.SkipLegacy = true

	flags := flag.NewFlagSet(prog, flag.ContinueOnError)
	flags.Usage = func() {
		fmt.Fprintf(flags.Output(), `Usage:
	%s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]

	This program recovers deleted collections. Recovery is
	possible when the collection's manifest is still available and
	all of its data blocks are still available or recoverable
	(e.g., garbage collection is not enabled, the blocks are too
	new for garbage collection, the blocks are referenced by other
	collections, or the blocks have been trashed but not yet
	deleted).

	There are multiple ways to specify a collection to recover:

        * Path to a local file containing a manifest with the desired
	  data

	* UUID of an Arvados log entry, typically a "delete" or
	  "update" event, whose "old attributes" have a manifest with
	  the desired data

	* UUID of an Arvados collection whose most recent log entry,
          typically a "delete" or "update" event, has the desired
          data in its "old attributes"

	For each provided collection manifest, once all data blocks
	are recovered/protected from garbage collection, a new
	collection is saved and its UUID is printed on stdout.

	Restored collections will belong to the system (root) user.

	Exit status will be zero if recovery is successful, i.e., a
	collection is saved for each provided manifest.
Options:
`, prog)
		flags.PrintDefaults()
	}
	loader.SetupFlags(flags)
	loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
	if ok, code := cmd.ParseFlags(flags, prog, args, "source [...]", stderr); !ok {
		return code
	} else if flags.NArg() == 0 {
		fmt.Fprintf(stderr, "missing required arguments (try -help)\n")
		return 2
	}

	lvl, err := logrus.ParseLevel(*loglevel)
	if err != nil {
		return 2
	}
	logger.SetLevel(lvl)

	cfg, err := loader.Load()
	if err != nil {
		return 1
	}
	cluster, err := cfg.GetCluster("")
	if err != nil {
		return 1
	}
	client, err := arvados.NewClientFromConfig(cluster)
	if err != nil {
		return 1
	}
	client.AuthToken = cluster.SystemRootToken
	rcvr := recoverer{
		client:  client,
		cluster: cluster,
		logger:  logger,
	}

	exitcode := 0
	for _, src := range flags.Args() {
		logger := logger.WithField("src", src)
		var mtxt string
		if !strings.Contains(src, "/") && len(src) == 27 && src[5] == '-' && src[11] == '-' {
			var filters []arvados.Filter
			if src[5:12] == "-57u5n-" {
				filters = []arvados.Filter{{"uuid", "=", src}}
			} else if src[5:12] == "-4zz18-" {
				filters = []arvados.Filter{{"object_uuid", "=", src}}
			} else {
				logger.Error("looks like a UUID but not a log or collection UUID (if it's really a file, prepend './')")
				exitcode = 1
				continue
			}
			var resp struct {
				Items []struct {
					UUID       string    `json:"uuid"`
					EventType  string    `json:"event_type"`
					EventAt    time.Time `json:"event_at"`
					ObjectUUID string    `json:"object_uuid"`
					Properties struct {
						OldAttributes struct {
							ManifestText string `json:"manifest_text"`
						} `json:"old_attributes"`
					} `json:"properties"`
				}
			}
			err = client.RequestAndDecode(&resp, "GET", "arvados/v1/logs", nil, arvados.ListOptions{
				Limit:   1,
				Order:   []string{"event_at desc"},
				Filters: filters,
			})
			if err != nil {
				logger.WithError(err).Error("error looking up log entry")
				exitcode = 1
				continue
			} else if len(resp.Items) == 0 {
				logger.Error("log entry not found")
				exitcode = 1
				continue
			}
			logent := resp.Items[0]
			logger.WithFields(logrus.Fields{
				"uuid":                logent.UUID,
				"old_collection_uuid": logent.ObjectUUID,
				"logged_event_type":   logent.EventType,
				"logged_event_time":   logent.EventAt,
				"logged_object_uuid":  logent.ObjectUUID,
			}).Info("loaded log entry")
			mtxt = logent.Properties.OldAttributes.ManifestText
			if mtxt == "" {
				logger.Error("log entry properties.old_attributes.manifest_text missing or empty")
				exitcode = 1
				continue
			}
		} else {
			buf, err := ioutil.ReadFile(src)
			if err != nil {
				logger.WithError(err).Error("failed to load manifest data from file")
				exitcode = 1
				continue
			}
			mtxt = string(buf)
		}
		uuid, err := rcvr.RecoverManifest(string(mtxt))
		if err != nil {
			logger.WithError(err).Error("recovery failed")
			exitcode = 1
			continue
		}
		logger.WithField("UUID", uuid).Info("recovery succeeded")
		fmt.Fprintln(stdout, uuid)
	}
	return exitcode
}

type recoverer struct {
	client  *arvados.Client
	cluster *arvados.Cluster
	logger  logrus.FieldLogger
}

var errNotFound = errors.New("not found")

// Finds the timestamp of the newest copy of blk on svc. Returns
// errNotFound if blk is not on svc at all.
func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
	found, err := svc.Index(ctx, rcvr.client, blk)
	if err != nil {
		logger.WithError(err).Warn("error getting index")
		return time.Time{}, err
	} else if len(found) == 0 {
		return time.Time{}, errNotFound
	}
	var latest time.Time
	for _, ent := range found {
		t := time.Unix(0, ent.Mtime)
		if t.After(latest) {
			latest = t
		}
	}
	logger.WithField("latest", latest).Debug("found")
	return latest, nil
}

var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")

// Ensures the given block exists on the given server and won't be
// eligible for trashing until after our chosen deadline (blobsigexp).
// Returns an error if the block doesn't exist on the given server, or
// has an old timestamp and can't be updated.
//
// After we decide a block is "safe" (whether or not we had to untrash
// it), keep-balance might notice that it's currently unreferenced and
// decide to trash it, all before our recovered collection gets
// saved. But if the block's timestamp is more recent than blobsigttl,
// keepstore will refuse to trash it even if told to by keep-balance.
func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
	if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
		return err
	} else if latest.Add(blobsigttl).After(blobsigexp) {
		return nil
	}
	if err := svc.Touch(ctx, rcvr.client, blk); err != nil {
		return fmt.Errorf("error updating timestamp: %s", err)
	}
	logger.Debug("updated timestamp")
	if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
		return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
	} else if err != nil {
		return err
	} else if latest.Add(blobsigttl).After(blobsigexp) {
		return nil
	} else {
		return errTouchIneffective
	}
}

// Untrash and update GC timestamps (as needed) on blocks referenced
// by the given manifest, save a new collection and return the new
// collection's UUID.
func (rcvr recoverer) RecoverManifest(mtxt string) (string, error) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	coll := arvados.Collection{ManifestText: mtxt}
	blks, err := coll.SizedDigests()
	if err != nil {
		return "", err
	}
	todo := make(chan int, len(blks))
	for idx := range blks {
		todo <- idx
	}
	go close(todo)

	var services []arvados.KeepService
	err = rcvr.client.EachKeepService(func(svc arvados.KeepService) error {
		if svc.ServiceType == "proxy" {
			rcvr.logger.WithField("service", svc).Debug("ignore proxy service")
		} else {
			services = append(services, svc)
		}
		return nil
	})
	if err != nil {
		return "", fmt.Errorf("error getting list of keep services: %s", err)
	}
	rcvr.logger.WithField("services", services).Debug("got list of services")

	// blobsigexp is our deadline for saving the rescued
	// collection. This must be less than BlobSigningTTL
	// (otherwise our rescued blocks could be garbage collected
	// again before we protect them by saving the collection) but
	// the exact value is somewhat arbitrary. If it's too soon, it
	// will arrive before we're ready to save, and save will
	// fail. If it's too late, we'll needlessly update timestamps
	// on some blocks that were recently written/touched (e.g., by
	// a previous attempt to rescue this same collection) and
	// would have lived long enough anyway if left alone.
	// BlobSigningTTL/2 (typically around 1 week) is much longer
	// than than we need to recover even a very large collection.
	blobsigttl := rcvr.cluster.Collections.BlobSigningTTL.Duration()
	blobsigexp := time.Now().Add(blobsigttl / 2)
	rcvr.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")

	// We'll start a number of threads, each working on
	// checking/recovering one block at a time. The threads
	// themselves don't need much CPU/memory, but to avoid hitting
	// limits on keepstore connections, backend storage bandwidth,
	// etc., we limit concurrency to 2 per keepstore node.
	workerThreads := 2 * len(services)

	blkFound := make([]bool, len(blks))
	var wg sync.WaitGroup
	for i := 0; i < workerThreads; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
		nextblk:
			for idx := range todo {
				blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
				logger := rcvr.logger.WithField("block", blk)
				for _, untrashing := range []bool{false, true} {
					for _, svc := range services {
						logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
						if untrashing {
							if err := svc.Untrash(ctx, rcvr.client, blk); err != nil {
								logger.WithError(err).Debug("untrash failed")
								continue
							}
							logger.Info("untrashed")
						}
						err := rcvr.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
						if err == errNotFound {
							logger.Debug(err)
						} else if err != nil {
							logger.Error(err)
						} else {
							blkFound[idx] = true
							continue nextblk
						}
					}
				}
				logger.Debug("unrecoverable")
			}
		}()
	}
	wg.Wait()

	var have, havenot int
	for _, ok := range blkFound {
		if ok {
			have++
		} else {
			havenot++
		}
	}
	if havenot > 0 {
		if have > 0 {
			rcvr.logger.Warn("partial recovery is not implemented")
		}
		return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
	}

	if rcvr.cluster.Collections.BlobSigning {
		key := []byte(rcvr.cluster.Collections.BlobSigningKey)
		coll.ManifestText = arvados.SignManifest(coll.ManifestText, rcvr.client.AuthToken, blobsigexp, blobsigttl, key)
	}
	rcvr.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
	err = rcvr.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
		"collection": map[string]interface{}{
			"manifest_text": coll.ManifestText,
		},
	})
	if err != nil {
		return "", fmt.Errorf("error saving new collection: %s", err)
	}
	rcvr.logger.WithField("UUID", coll.UUID).Debug("created new collection")
	return coll.UUID, nil
}