Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AgentIdentification protobuf and callback #58

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,68 @@ func TestSetAgentAttributes(t *testing.T) {
assert.NoError(t, err)
}

func TestAgentIdentification(t *testing.T) {
// Start a server.
srv := internal.StartMockServer(t)
newInstanceUid := ulid.MustNew(ulid.Timestamp(time.Now()), ulid.Monotonic(rand.New(rand.NewSource(0)), 0))
var rcvAgentInstanceUid atomic.Value
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
rcvAgentInstanceUid.Store(msg.InstanceUid)
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
NewInstanceUid: newInstanceUid.String(),
},
}
}

// Start a client.
settings := StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
settings.AgentDescription = &protobufs.AgentDescription{}
settings.Callbacks = types.CallbacksStruct{
OnAgentIdentificationFunc: func(
ctx context.Context,
agentId *protobufs.AgentIdentification,
) error {
return nil
},
}
client := prepareClient(settings)

oldInstanceUid := settings.InstanceUid
assert.NoError(t, client.Start(settings))

// First, server gets the original instanceId
eventually(t, func() bool {
instanceUid, ok := rcvAgentInstanceUid.Load().(string)
if !ok {
return false
}
return instanceUid == oldInstanceUid
})

// Send a dummy message
client.SetAgentDescription(&protobufs.AgentDescription{})

// When it was sent, the new instance uid should have been used, which should
// have been observed by the server
eventually(t, func() bool {
instanceUid, ok := rcvAgentInstanceUid.Load().(string)
if !ok {
return false
}
return instanceUid == newInstanceUid.String()
})

// Shutdown the server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
}

func TestConnectionSettings(t *testing.T) {
hash := []byte{1, 2, 3}
opampSettings := &protobufs.ConnectionSettings{DestinationEndpoint: "http://opamp.com"}
Expand Down
23 changes: 23 additions & 0 deletions client/internal/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (r *Receiver) processReceivedMessage(ctx context.Context, msg *protobufs.Se

r.rcvConnectionSettings(ctx, msg.ConnectionSettings)
r.rcvAddonsAvailable(msg.AddonsAvailable)
r.rcvAgentIdentification(ctx, msg.AgentIdentification)

if reportStatus {
r.sender.ScheduleSend()
Expand Down Expand Up @@ -175,6 +176,28 @@ func (r *Receiver) rcvAddonsAvailable(addons *protobufs.AddonsAvailable) {
// TODO: implement this.
}

func (r *Receiver) rcvAgentIdentification(ctx context.Context, agentId *protobufs.AgentIdentification) {
if agentId == nil {
return
}

if agentId.NewInstanceUid == "" {
r.logger.Debugf("Empty instance uid is not allowed")
return
}

err := r.callbacks.OnAgentIdentification(ctx, agentId)
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
r.logger.Errorf("Error while updating agent identification: %v", err)
return
}

err = r.sender.SetInstanceUid(agentId.NewInstanceUid)
if err != nil {
r.logger.Errorf("Error while setting instance uid: %v, err")
}
}

func (r *Receiver) rcvCommand(command *protobufs.ServerToAgentCommand) {
if command != nil {
r.callbacks.OnCommand(command)
Expand Down
18 changes: 15 additions & 3 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package internal

import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
Expand All @@ -13,7 +15,7 @@ import (

// Sender implements the client's sending portion of OpAMP protocol.
type Sender struct {
instanceUid string
instanceUid atomic.Value
conn *websocket.Conn

logger types.Logger
Expand Down Expand Up @@ -43,7 +45,7 @@ func NewSender(logger types.Logger) *Sender {
// earlier. To stop the Sender cancel the ctx.
func (s *Sender) Start(ctx context.Context, instanceUid string, conn *websocket.Conn) error {
s.conn = conn
s.instanceUid = instanceUid
s.instanceUid.Store(instanceUid)
err := s.sendNextMessage()

// Run the sender in the background.
Expand All @@ -53,6 +55,16 @@ func (s *Sender) Start(ctx context.Context, instanceUid string, conn *websocket.
return err
}

// SetInstanceUid sets a new instanceUid without closing and reopening the connection. It will be used
// when next message is being sent.
func (s *Sender) SetInstanceUid(instanceUid string) error {
if instanceUid == "" {
return fmt.Errorf("cannot set instance uid to empty value")
}
s.instanceUid.Store(instanceUid)
return nil
}

// WaitToStop blocks until the sender is stopped. To stop the sender cancel the context
// that was passed to Start().
func (s *Sender) WaitToStop() {
Expand Down Expand Up @@ -125,7 +137,7 @@ func (s *Sender) sendNextMessage() error {
if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
// There is a pending message and the message has some fields populated.
// Set the InstanceUid field and send it.
msgToSend.InstanceUid = s.instanceUid
msgToSend.InstanceUid = s.instanceUid.Load().(string)
return s.sendMessage(msgToSend)
}
return nil
Expand Down
16 changes: 16 additions & 0 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ type Callbacks interface {
// syncer can be used to initiate syncing the package from the server.
OnAgentPackageAvailable(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error

// OnAgentIdentification is called when the server requests changing identification of the agent.
// Agent should be updated with new id and use it for all further communication.
// If Agent does not support the identification override from the server, the function should return an error.
OnAgentIdentification(ctx context.Context, agentId *protobufs.AgentIdentification) error

// OnCommand is called when the server requests that the connected agent perform a command.
OnCommand(command *protobufs.ServerToAgentCommand) error

Expand Down Expand Up @@ -169,6 +174,7 @@ type CallbacksStruct struct {

OnAddonsAvailableFunc func(ctx context.Context, addons *protobufs.AddonsAvailable, syncer AddonSyncer) error
OnAgentPackageAvailableFunc func(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error
OnAgentIdentificationFunc func(ctx context.Context, agentId *protobufs.AgentIdentification) error

OnCommandFunc func(command *protobufs.ServerToAgentCommand) error
}
Expand Down Expand Up @@ -263,3 +269,13 @@ func (c CallbacksStruct) OnCommand(command *protobufs.ServerToAgentCommand) erro
}
return nil
}

func (c CallbacksStruct) OnAgentIdentification(
ctx context.Context,
agentId *protobufs.AgentIdentification,
) error {
if c.OnAgentIdentificationFunc != nil {
return c.OnAgentIdentificationFunc(ctx, agentId)
}
return nil
}
27 changes: 26 additions & 1 deletion internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/client/types"

"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
)

Expand Down Expand Up @@ -99,6 +99,7 @@ func (agent *Agent) start() error {
},
OnRemoteConfigFunc: agent.onRemoteConfig,
OnOwnTelemetryConnectionSettingsFunc: agent.onOwnTelemetryConnectionSettings,
OnAgentIdentificationFunc: agent.onAgentIdentificationFunc,
},
LastRemoteConfigHash: agent.remoteConfigHash,
LastEffectiveConfig: agent.composeEffectiveConfig(),
Expand Down Expand Up @@ -160,6 +161,18 @@ func (agent *Agent) createAgentIdentity() {
}
}

func (agent *Agent) updateAgentIdentity(instanceId ulid.ULID) {
agent.logger.Debugf("Agent identify is being changed from id=%v to id=%v",
agent.instanceId.String(),
instanceId.String())
agent.instanceId = instanceId

if agent.metricReporter != nil {
// TODO: reinit or update meter (possibly using a single function to update all own connection settings
// or with having a common resource factory or so)
}
}

func (agent *Agent) loadLocalConfig() {
var k = koanf.New(".")
k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser())
Expand Down Expand Up @@ -210,6 +223,18 @@ func (agent *Agent) onOwnTelemetryConnectionSettings(
return nil
}

func (agent *Agent) onAgentIdentificationFunc(
_ context.Context,
agentId *protobufs.AgentIdentification,
) error {
newInstanceId, err := ulid.Parse(agentId.NewInstanceUid)
if err != nil {
return err
}
agent.updateAgentIdentity(newInstanceId)
return nil
}

func (agent *Agent) initMeter(settings *protobufs.ConnectionSettings) {
reporter, err := NewMetricReporter(agent.logger, settings, agent.agentType, agent.agentVersion, agent.instanceId)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions internal/proto/opamp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ message AgentToServer {
// AgentDisconnect MUST be set in the last AgentToServer message sent from the
// agent to the server.
AgentDisconnect agent_disconnect = 5;

enum AgentToServerFlags {
FlagsUnspecified = 0;
// AgentToServerFlags is a bit mask. Values below define individual bits.

// The agent requests server go generate a new instance_uid, which will
// be sent back in ServerToAgent message
RequestInstanceUid = 0x00000001;
}
// Bit flags as defined by AgentToServerFlags bit masks.
AgentToServerFlags flags = 6;
}

// AgentDisconnect is the last message sent from the agent to the server. The server
Expand Down Expand Up @@ -112,6 +123,10 @@ message ServerToAgent {
// UnspecifiedServerCapability value.
ServerCapabilities capabilities = 8;

// Properties related to identification of the agent, which can be overridden
// by the server if needed.
AgentIdentification agent_identification = 9;

// Allows the Server to instruct the agent to perform a command, e.g. RESTART. This field should not be specified
// with fields other than instance_uid and capabilities. If specified, other fields will be ignored and the command
// will be performed.
Expand Down Expand Up @@ -681,6 +696,14 @@ message AgentInstallStatus {
string error_message = 4;
}

// Properties related to identification of the agent, which can be overridden
// by the server if needed
message AgentIdentification {
// When new_instance_uid is set, Agent MUST update instance_uid
// to the value provided and use it for all further communication.
string new_instance_uid = 1;
}

/////////////////////////////////////////////////////////////////////////////////////
// Config messages
/////////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading