package keepclient

import (
	"errors"
	"io"
	"os"

	"git.curoverse.com/arvados.git/sdk/go/manifest"
)

// ReadCloserWithLen extends io.ReadCloser with a Len() method that
// returns the total number of bytes available to read.
type ReadCloserWithLen interface {
	io.ReadCloser
	Len() uint64
}

const (
	// After reading a data block from Keep, cfReader slices it up
	// and sends the slices to a buffered channel to be consumed
	// by the caller via Read().
	//
	// dataSliceSize is the maximum size of the slices, and
	// therefore the maximum number of bytes that will be returned
	// by a single call to Read().
	dataSliceSize = 1 << 20
)

// ErrNoManifest indicates the given collection has no manifest
// information (e.g., manifest_text was excluded by a "select"
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")

// CollectionFileReader returns a ReadCloserWithLen that reads file
// content from a collection. The filename must be given relative to
// the root of the collection, without a leading "./".
func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
	mText, ok := collection["manifest_text"].(string)
	if !ok {
		return nil, ErrNoManifest
	}
	m := manifest.Manifest{Text: mText}
	return kc.ManifestFileReader(m, filename)
}

func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
	rdrChan := make(chan *cfReader)
	go kc.queueSegmentsToGet(m, filename, rdrChan)
	r, ok := <-rdrChan
	if !ok {
		return nil, os.ErrNotExist
	}
	return r, nil
}

// Send segments for the specified file to r.toGet. Send a *cfReader
// to rdrChan if the specified file is found (even if it's empty).
// Then, close rdrChan.
func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
	defer close(rdrChan)

	// q is a queue of FileSegments that we have received but
	// haven't yet been able to send to toGet.
	var q []*manifest.FileSegment
	var r *cfReader
	for seg := range m.FileSegmentIterByName(filename) {
		if r == nil {
			// We've just discovered that the requested
			// filename does appear in the manifest, so we
			// can return a real reader (not nil) from
			// CollectionFileReader().
			r = newCFReader(kc)
			rdrChan <- r
		}
		q = append(q, seg)
		r.totalSize += uint64(seg.Len)
		// Send toGet as many segments as we can until it
		// blocks.
	Q:
		for len(q) > 0 {
			select {
			case r.toGet <- q[0]:
				q = q[1:]
			default:
				break Q
			}
		}
	}
	if r == nil {
		// File not found.
		return
	}
	close(r.countDone)
	for _, seg := range q {
		r.toGet <- seg
	}
	close(r.toGet)
}

type cfReader struct {
	keepClient *KeepClient

	// doGet() reads FileSegments from toGet, gets the data from
	// Keep, and sends byte slices to toRead to be consumed by
	// Read().
	toGet chan *manifest.FileSegment

	// toRead is a buffered channel, sized to fit one full Keep
	// block. This lets us verify checksums without having a
	// store-and-forward delay between blocks: by the time the
	// caller starts receiving data from block N, cfReader is
	// starting to fetch block N+1. A larger buffer would be
	// useful for a caller whose read speed varies a lot.
	toRead chan []byte

	// bytes ready to send next time someone calls Read()
	buf []byte

	// Total size of the file being read. Not safe to read this
	// until countDone is closed.
	totalSize uint64
	countDone chan struct{}

	// First error encountered.
	err error

	// errNotNil is closed IFF err contains a non-nil error.
	// Receiving from it will block until an error occurs.
	errNotNil chan struct{}

	// rdrClosed is closed IFF the reader's Close() method has
	// been called. Any goroutines associated with the reader will
	// stop and free up resources when they notice this channel is
	// closed.
	rdrClosed chan struct{}
}

func (r *cfReader) Read(outbuf []byte) (int, error) {
	if r.Error() != nil {
		// Short circuit: the caller might as well find out
		// now that we hit an error, even if there's buffered
		// data we could return.
		return 0, r.Error()
	}
	for len(r.buf) == 0 {
		// Private buffer was emptied out by the last Read()
		// (or this is the first Read() and r.buf is nil).
		// Read from r.toRead until we get a non-empty slice
		// or hit an error.
		var ok bool
		r.buf, ok = <-r.toRead
		if r.Error() != nil {
			// Error encountered while waiting for bytes
			return 0, r.Error()
		} else if !ok {
			// No more bytes to read, no error encountered
			return 0, io.EOF
		}
	}
	// Copy as much as possible from our private buffer to the
	// caller's buffer
	n := len(r.buf)
	if len(r.buf) > len(outbuf) {
		n = len(outbuf)
	}
	copy(outbuf[:n], r.buf[:n])

	// Next call to Read() will continue where we left off
	r.buf = r.buf[n:]

	return n, nil
}

// Close releases resources. It returns a non-nil error if an error
// was encountered by the reader.
func (r *cfReader) Close() error {
	close(r.rdrClosed)
	return r.Error()
}

// Error returns an error if one has been encountered, otherwise
// nil. It is safe to call from any goroutine.
func (r *cfReader) Error() error {
	select {
	case <-r.errNotNil:
		return r.err
	default:
		return nil
	}
}

// Len returns the total number of bytes in the file being read. If
// necessary, it waits for manifest parsing to finish.
func (r *cfReader) Len() uint64 {
	// Wait for all segments to be counted
	<-r.countDone
	return r.totalSize
}

func (r *cfReader) doGet() {
	defer close(r.toRead)
GET:
	for fs := range r.toGet {
		rdr, _, _, err := r.keepClient.Get(fs.Locator)
		if err != nil {
			r.err = err
			close(r.errNotNil)
			return
		}
		var buf = make([]byte, fs.Offset+fs.Len)
		_, err = io.ReadFull(rdr, buf)
		errClosing := rdr.Close()
		if err == nil {
			err = errClosing
		}
		if err != nil {
			r.err = err
			close(r.errNotNil)
			return
		}
		for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
			if bOff+bLen > fs.Offset+fs.Len {
				bLen = fs.Offset + fs.Len - bOff
			}
			select {
			case r.toRead <- buf[bOff : bOff+bLen]:
			case <-r.rdrClosed:
				// Reader is closed: no point sending
				// anything more to toRead.
				break GET
			}
		}
		// It is possible that r.rdrClosed is closed but we
		// never noticed because r.toRead was also ready in
		// every select{} above. Here we check before wasting
		// a keepclient.Get() call.
		select {
		case <-r.rdrClosed:
			break GET
		default:
		}
	}
	// In case we exited the above loop early: before returning,
	// drain the toGet channel so its sender doesn't sit around
	// blocking forever.
	for _ = range r.toGet {
	}
}

func newCFReader(kc *KeepClient) (r *cfReader) {
	r = new(cfReader)
	r.keepClient = kc
	r.rdrClosed = make(chan struct{})
	r.errNotNil = make(chan struct{})
	r.toGet = make(chan *manifest.FileSegment, 2)
	r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
	r.countDone = make(chan struct{})
	go r.doGet()
	return
}