// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package crunchrun

import (
	"bytes"
	"encoding/json"
	"io"
	"log"
	"time"
)

// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"

// prefixer wraps an io.Writer, inserting a string returned by
// prefixFunc at the beginning of each line.
type prefixer struct {
	writer     io.Writer
	prefixFunc func() string
	unfinished bool // true if the most recent write ended with a non-newline char
}

// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
// timestamp at the beginning of each line.
func newTimestamper(w io.Writer) *prefixer {
	return &prefixer{
		writer:     w,
		prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
	}
}

// newStringPrefixer wraps an io.Writer, inserting the given string at
// the beginning of each line. The given string should include a
// trailing space for readability.
func newStringPrefixer(w io.Writer, s string) *prefixer {
	return &prefixer{
		writer:     w,
		prefixFunc: func() string { return s },
	}
}

func (tp *prefixer) Write(p []byte) (n int, err error) {
	for len(p) > 0 && err == nil {
		if !tp.unfinished {
			_, err = io.WriteString(tp.writer, tp.prefixFunc())
			if err != nil {
				return
			}
		}
		newline := bytes.IndexRune(p, '\n')
		var nn int
		if newline < 0 {
			tp.unfinished = true
			nn, err = tp.writer.Write(p)
			p = nil
		} else {
			tp.unfinished = false
			nn, err = tp.writer.Write(p[:newline+1])
			p = p[nn:]
		}
		n += nn
	}
	return
}

// logWriter adds log.Logger methods to an io.Writer.
type logWriter struct {
	io.Writer
	*log.Logger
}

func newLogWriter(w io.Writer) *logWriter {
	return &logWriter{
		Writer: w,
		Logger: log.New(w, "", 0),
	}
}

var crunchLogUpdatePeriod = time.Hour / 2
var crunchLogUpdateSize = int64(1 << 25)

// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
	loadDuration := func(dst *time.Duration, key string) {
		if param, err := clnt.Discovery(key); err != nil {
			return
		} else if d, ok := param.(float64); !ok {
			return
		} else {
			*dst = time.Duration(d) * time.Second
		}
	}
	loadInt64 := func(dst *int64, key string) {
		if param, err := clnt.Discovery(key); err != nil {
			return
		} else if val, ok := param.(float64); !ok {
			return
		} else {
			*dst = int64(val)
		}
	}

	loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
	loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")

}

type filterKeepstoreErrorsOnly struct {
	io.WriteCloser
	buf []byte
}

func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
	log.Printf("filterKeepstoreErrorsOnly: write %q", p)
	f.buf = append(f.buf, p...)
	start := 0
	for i := len(f.buf) - len(p); i < len(f.buf); i++ {
		if f.buf[i] == '\n' {
			if f.check(f.buf[start:i]) {
				_, err := f.WriteCloser.Write(f.buf[start : i+1])
				if err != nil {
					return 0, err
				}
			}
			start = i + 1
		}
	}
	if start > 0 {
		copy(f.buf, f.buf[start:])
		f.buf = f.buf[:len(f.buf)-start]
	}
	return len(p), nil
}

func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
	if len(line) == 0 {
		return false
	}
	if line[0] != '{' {
		return true
	}
	var m map[string]interface{}
	err := json.Unmarshal(line, &m)
	if err != nil {
		return true
	}
	if m["msg"] == "request" {
		return false
	}
	if m["msg"] == "response" {
		if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
			return false
		}
	}
	return true
}