// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package ws

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math/rand"
	"net/url"
	"os"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/arvadostest"
	"git.arvados.org/arvados.git/sdk/go/ctxlog"
	"golang.org/x/net/websocket"
	check "gopkg.in/check.v1"
)

func init() {
	if os.Getenv("ARVADOS_DEBUG") != "" {
		ctxlog.SetLevel("debug")
	}
}

var _ = check.Suite(&v0Suite{})

type v0Suite struct {
	serviceSuite serviceSuite
	token        string
	toDelete     []string
	wg           sync.WaitGroup
	ignoreLogID  int64
}

func (s *v0Suite) SetUpTest(c *check.C) {
	s.serviceSuite.SetUpTest(c)
	s.serviceSuite.start(c)

	s.token = arvadostest.ActiveToken
	s.ignoreLogID = s.lastLogID(c)
}

func (s *v0Suite) TearDownTest(c *check.C) {
	s.wg.Wait()
	s.serviceSuite.TearDownTest(c)
}

func (s *v0Suite) TearDownSuite(c *check.C) {
	s.deleteTestObjects(c)
}

func (s *v0Suite) deleteTestObjects(c *check.C) {
	ac := arvados.NewClientFromEnv()
	ac.AuthToken = arvadostest.AdminToken
	for _, path := range s.toDelete {
		err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
		c.Check(err, check.IsNil)
	}
	s.toDelete = nil
}

func (s *v0Suite) TestFilters(c *check.C) {
	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	cmd := func(method, eventType string, status int) {
		c.Check(w.Encode(map[string]interface{}{
			"method":  method,
			"filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
		}), check.IsNil)
		s.expectStatus(c, r, status)
	}
	cmd("subscribe", "update", 200)
	cmd("subscribe", "update", 200)
	cmd("subscribe", "create", 200)
	cmd("subscribe", "update", 200)
	cmd("unsubscribe", "blip", 400)
	cmd("unsubscribe", "create", 200)
	cmd("unsubscribe", "update", 200)

	go s.emitEvents(c, nil, nil)
	lg := s.expectLog(c, r)
	c.Check(lg.EventType, check.Equals, "update")

	cmd("unsubscribe", "update", 200)
	cmd("unsubscribe", "update", 200)
	cmd("unsubscribe", "update", 400)
}

func (s *v0Suite) TestLastLogID(c *check.C) {
	lastID := s.lastLogID(c)

	checkLogs := func(r *json.Decoder, uuid string) {
		for _, etype := range []string{"create", "blip", "update"} {
			lg := s.expectLog(c, r)
			for lg.ObjectUUID != uuid {
				lg = s.expectLog(c, r)
			}
			c.Check(lg.EventType, check.Equals, etype)
		}
	}

	// Connecting connEarly (before sending the early events) lets
	// us confirm all of the "early" events have already passed
	// through the server.
	connEarly, rEarly, wEarly, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer connEarly.Close()
	c.Check(wEarly.Encode(map[string]interface{}{
		"method": "subscribe",
	}), check.IsNil)
	s.expectStatus(c, rEarly, 200)

	// Send the early events.
	uuidChan := make(chan string, 1)
	s.emitEvents(c, uuidChan, nil)
	uuidEarly := <-uuidChan

	// Wait for the early events to pass through.
	checkLogs(rEarly, uuidEarly)

	// Connect the client that wants to get old events via
	// last_log_id.
	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	c.Check(w.Encode(map[string]interface{}{
		"method":      "subscribe",
		"last_log_id": lastID,
	}), check.IsNil)
	s.expectStatus(c, r, 200)

	checkLogs(r, uuidEarly)
	s.emitEvents(c, uuidChan, nil)
	checkLogs(r, <-uuidChan)
}

func (s *v0Suite) TestPermission(c *check.C) {
	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	c.Check(w.Encode(map[string]interface{}{
		"method": "subscribe",
	}), check.IsNil)
	s.expectStatus(c, r, 200)

	uuidChan := make(chan string, 2)
	go func() {
		s.token = arvadostest.AdminToken
		s.emitEvents(c, uuidChan, nil)
		s.token = arvadostest.ActiveToken
		s.emitEvents(c, uuidChan, nil)
	}()

	wrongUUID := <-uuidChan
	rightUUID := <-uuidChan
	lg := s.expectLog(c, r)
	for lg.ObjectUUID != rightUUID {
		c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
		lg = s.expectLog(c, r)
	}
}

// Two users create private objects; admin deletes both objects; each
// user receives a "delete" event for their own object (not for the
// other user's object).
func (s *v0Suite) TestEventTypeDelete(c *check.C) {
	clients := []struct {
		token string
		uuid  string
		conn  *websocket.Conn
		r     *json.Decoder
		w     *json.Encoder
	}{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
	for i := range clients {
		uuidChan := make(chan string, 1)
		s.token = clients[i].token
		s.emitEvents(c, uuidChan, nil)
		clients[i].uuid = <-uuidChan

		var err error
		clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
		c.Assert(err, check.IsNil)

		c.Check(clients[i].w.Encode(map[string]interface{}{
			"method": "subscribe",
		}), check.IsNil)
		s.expectStatus(c, clients[i].r, 200)
	}

	s.ignoreLogID = s.lastLogID(c)
	s.deleteTestObjects(c)

	for _, client := range clients {
		lg := s.expectLog(c, client.r)
		c.Check(lg.ObjectUUID, check.Equals, client.uuid)
		c.Check(lg.EventType, check.Equals, "delete")
	}
}

// Trashing/deleting a collection produces an "update" event with
// properties["new_attributes"]["is_trashed"] == true.
func (s *v0Suite) TestTrashedCollection(c *check.C) {
	ac := arvados.NewClientFromEnv()
	ac.AuthToken = s.token

	var coll arvados.Collection
	err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
	c.Assert(err, check.IsNil)
	s.ignoreLogID = s.lastLogID(c)

	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	c.Check(w.Encode(map[string]interface{}{
		"method": "subscribe",
	}), check.IsNil)
	s.expectStatus(c, r, 200)

	err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
	c.Assert(err, check.IsNil)

	lg := s.expectLog(c, r)
	c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
	c.Check(lg.EventType, check.Equals, "update")
	c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
	c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
}

func (s *v0Suite) TestSendBadJSON(c *check.C) {
	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	c.Check(w.Encode(map[string]interface{}{
		"method": "subscribe",
	}), check.IsNil)
	s.expectStatus(c, r, 200)

	_, err = fmt.Fprint(conn, "^]beep\n")
	c.Check(err, check.IsNil)
	s.expectStatus(c, r, 400)

	c.Check(w.Encode(map[string]interface{}{
		"method": "subscribe",
	}), check.IsNil)
	s.expectStatus(c, r, 200)
}

func (s *v0Suite) TestSubscribe(c *check.C) {
	conn, r, w, err := s.testClient()
	c.Assert(err, check.IsNil)
	defer conn.Close()

	s.emitEvents(c, nil, nil)

	err = w.Encode(map[string]interface{}{"21": 12})
	c.Check(err, check.IsNil)
	s.expectStatus(c, r, 400)

	err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
	c.Check(err, check.IsNil)
	s.expectStatus(c, r, 200)

	uuidChan := make(chan string, 1)
	go s.emitEvents(c, uuidChan, nil)
	uuid := <-uuidChan

	for _, etype := range []string{"create", "blip", "update"} {
		lg := s.expectLog(c, r)
		for lg.ObjectUUID != uuid {
			lg = s.expectLog(c, r)
		}
		c.Check(lg.EventType, check.Equals, etype)
	}
}

func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
	// Frequent slow listener pings create the conditions for a
	// deadlock issue with the lib/pq example listener usage.
	//
	// Specifically: a lib/pq/example/listen-style event loop can
	// deadlock if enough (~32) server notifications arrive after
	// the event loop decides to call Ping (e.g., while
	// listener.Ping() is waiting for a response from the server,
	// or in the time.Sleep() invoked by testSlowPing).
	//
	// (*ListenerConn)listenerConnLoop() doesn't see the server's
	// ping response until it finishes sending a previous
	// notification through its internal queue to
	// (*Listener)listenerConnLoop(), which is blocked on sending
	// to our Notify channel, which is blocked on waiting for the
	// Ping response.
	defer func(d time.Duration) {
		listenerPingInterval = d
		testSlowPing = false
	}(listenerPingInterval)
	listenerPingInterval = time.Second / 2
	testSlowPing = true
	// Restart the test server in order to get one that uses our
	// test globals.
	s.TearDownTest(c)
	s.SetUpTest(c)

	done := make(chan struct{})
	defer close(done)
	go s.emitEvents(c, nil, done)

	// We will expect to receive at least one event during each
	// one-second interval while the test is running.
	t0 := time.Now()
	seconds := 10
	receivedPerSecond := make([]int64, seconds)

	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
	defer cancel()
	for clientID := 0; clientID < 100; clientID++ {
		clientID := clientID
		go func() {
			for ctx.Err() == nil {
				conn, r, w, err := s.testClient()
				if ctx.Err() != nil {
					return
				}
				c.Assert(err, check.IsNil)
				defer conn.Close()
				err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
				if ctx.Err() != nil {
					return
				}
				c.Check(err, check.IsNil)
				s.expectStatus(c, r, 200)
				for {
					if clientID%10 == 0 {
						// slow client
						time.Sleep(time.Second / 20)
					} else if rand.Float64() < 0.01 {
						// disconnect+reconnect
						break
					}
					var lg arvados.Log
					err := r.Decode(&lg)
					if ctx.Err() != nil {
						return
					}
					if errors.Is(err, io.EOF) {
						break
					}
					c.Check(err, check.IsNil)
					if i := int(time.Since(t0) / time.Second); i < seconds {
						atomic.AddInt64(&receivedPerSecond[i], 1)
					}
				}
				conn.Close()
			}
		}()
	}
	<-ctx.Done()
	c.Log("done")
	for i, n := range receivedPerSecond {
		c.Logf("t<%d n=%d", i+1, n)
		c.Check(int64(n), check.Not(check.Equals), int64(0))
	}
}

// Generate some events by creating and updating a workflow object,
// and creating a custom log entry (event_type="blip") about the newly
// created workflow.
//
// If uuidChan is not nil, send the new workflow UUID to uuidChan as
// soon as it's known.
//
// If done is not nil, keep generating events until done receives or
// closes.
func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
	s.wg.Add(1)
	defer s.wg.Done()

	ac := arvados.NewClientFromEnv()
	ac.AuthToken = s.token
	wf := &arvados.Workflow{
		Name: "ws_test",
	}
	err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
	c.Assert(err, check.IsNil)
	s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
	if uuidChan != nil {
		uuidChan <- wf.UUID
	}
	for i := 0; ; i++ {
		lg := &arvados.Log{}
		err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
			"object_uuid": wf.UUID,
			"event_type":  "blip",
			"properties": map[string]interface{}{
				"beep": "boop",
			},
		}), nil)
		s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
		if done != nil {
			select {
			case <-done:
			default:
				if i%50 == 0 {
					time.Sleep(100 * time.Millisecond)
				}
				continue
			}
		}
		break
	}
	if err != nil {
		panic(err)
	}
	err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
	if err != nil {
		panic(err)
	}
}

func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
	val, ok := ob.(string)
	if !ok {
		j, err := json.Marshal(ob)
		if err != nil {
			panic(err)
		}
		val = string(j)
	}
	v := url.Values{}
	v[rscName] = []string{val}
	return bytes.NewBufferString(v.Encode())
}

func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
	msg := map[string]interface{}{}
	c.Check(r.Decode(&msg), check.IsNil)
	c.Check(int(msg["status"].(float64)), check.Equals, status)
}

func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
	lg := &arvados.Log{}
	ok := make(chan struct{})
	go func() {
		defer close(ok)
		for lg.ID <= s.ignoreLogID {
			c.Assert(r.Decode(lg), check.IsNil)
		}
	}()
	select {
	case <-time.After(10 * time.Second):
		c.Error("timed out")
		c.FailNow()
		return lg
	case <-ok:
		return lg
	}
}

func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
	srv := s.serviceSuite.srv
	conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
	if err != nil {
		return nil, nil, nil, err
	}
	w := json.NewEncoder(conn)
	r := json.NewDecoder(conn)
	return conn, r, w, nil
}

func (s *v0Suite) lastLogID(c *check.C) int64 {
	var lastID int64
	c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
	return lastID
}