package kapacitor

import (
	"fmt"

	"github.com/influxdata/kapacitor/edge"
	"github.com/influxdata/kapacitor/keyvalue"
	"github.com/influxdata/kapacitor/models"
	"github.com/influxdata/kapacitor/pipeline"
)

type ChangeDetectNode struct {
	node
	d *pipeline.ChangeDetectNode
}

// Create a new changeDetect node.
func newChangeDetectNode(et *ExecutingTask, n *pipeline.ChangeDetectNode, d NodeDiagnostic) (*ChangeDetectNode, error) {
	dn := &ChangeDetectNode{
		node: node{Node: n, et: et, diag: d},
		d:    n,
	}
	// Create stateful expressions
	dn.node.runF = dn.runChangeDetect
	return dn, nil
}

func (n *ChangeDetectNode) runChangeDetect([]byte) error {
	consumer := edge.NewGroupedConsumer(
		n.ins[0],
		n,
	)
	n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
	return consumer.Consume()
}

func (n *ChangeDetectNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
	return edge.NewReceiverFromForwardReceiverWithStats(
		n.outs,
		edge.NewTimedForwardReceiver(n.timer, n.newGroup()),
	), nil
}

func (n *ChangeDetectNode) newGroup() *changeDetectGroup {
	return &changeDetectGroup{
		n: n,
	}
}

type changeDetectGroup struct {
	n        *ChangeDetectNode
	previous edge.FieldsTagsTimeGetter
}

func (g *changeDetectGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
	if s := begin.SizeHint(); s > 0 {
		begin = begin.ShallowCopy()
		begin.SetSizeHint(0)
	}
	g.previous = nil
	return begin, nil
}

func (g *changeDetectGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
	emit := g.doChangeDetect(bp)
	if emit {
		return bp, nil
	}
	return nil, nil
}

func (g *changeDetectGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
	return end, nil
}

func (g *changeDetectGroup) Point(p edge.PointMessage) (edge.Message, error) {
	emit := g.doChangeDetect(p)
	if emit {
		return p, nil
	}
	return nil, nil
}

// doChangeDetect computes the changeDetect with respect to g.previous and p.
// The resulting changeDetect value will be set on n.
func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter) bool {
	var prevFields, currFields models.Fields
	if g.previous != nil {
		prevFields = g.previous.Fields()
	}
	currFields = p.Fields()
	emit := g.n.changeDetect(prevFields, currFields)

	if !emit {
		return false
	}
	g.previous = p
	return true
}

func (g *changeDetectGroup) Barrier(b edge.BarrierMessage) (edge.Message, error) {
	return b, nil
}
func (g *changeDetectGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
	return d, nil
}
func (g *changeDetectGroup) Done() {}

// changeDetect calculates the changeDetect between prev and cur.
// Return is the resulting changeDetect, whether the current point should be
// stored as previous, and whether the point result should be emitted.
func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields) bool {

	value, ok := curr[n.d.Field]
	if !ok {
		n.diag.Error("Invalid field in change detect",
			fmt.Errorf("expected field %s not found", n.d.Field),
			keyvalue.KV("field", n.d.Field))
		return false
	}
	if prev[n.d.Field] == value {
		return false
	}

	return true
}