Skip to content

Commit

Permalink
Unify addons and agent packages
Browse files Browse the repository at this point in the history
Applies spec change open-telemetry/opamp-spec#77
  • Loading branch information
tigrannajaryan committed May 3, 2022
1 parent b0398a1 commit 1882ca8
Show file tree
Hide file tree
Showing 10 changed files with 927 additions and 1,256 deletions.
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type StartSettings struct {

LastConnectionSettingsHash []byte

// The hash of the last locally-saved server-provided addons. If nil is passed
// it will force the server to send addons list back.
LastServerProvidedAllAddonsHash []byte
// The hash of the last locally-saved server-provided packages. If nil is passed
// it will force the server to send packages list back.
LastServerProvidedAllPackagesHash []byte
}

type OpAMPClient interface {
Expand Down
6 changes: 3 additions & 3 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func (w *httpClient) Start(settings StartSettings) error {
msg.StatusReport = &protobufs.StatusReport{}
}
msg.StatusReport.AgentDescription = w.settings.AgentDescription
if msg.AddonStatuses == nil {
msg.AddonStatuses = &protobufs.AgentAddonStatuses{}
if msg.PackageStatuses == nil {
msg.PackageStatuses = &protobufs.PackageStatuses{}
}
msg.AddonStatuses.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash
msg.PackageStatuses.ServerProvidedAllPackagesHash = w.settings.LastServerProvidedAllPackagesHash
},
)
w.looper.ScheduleSend()
Expand Down
4 changes: 2 additions & 2 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
reportStatus := r.rcvRemoteConfig(ctx, msg.RemoteConfig, msg.Flags)

r.rcvConnectionSettings(ctx, msg.ConnectionSettings)
r.rcvAddonsAvailable(msg.AddonsAvailable)
r.rcvPackagesAvailable(msg.PackagesAvailable)
r.rcvAgentIdentification(ctx, msg.AgentIdentification)

if reportStatus {
Expand Down Expand Up @@ -154,7 +154,7 @@ func (r *receivedProcessor) processErrorResponse(body *protobufs.ServerErrorResp
r.logger.Errorf("received an error from server: %s", body.ErrorMessage)
}

func (r *receivedProcessor) rcvAddonsAvailable(addons *protobufs.AddonsAvailable) {
func (r *receivedProcessor) rcvPackagesAvailable(packages *protobufs.PackagesAvailable) {
// TODO: implement this.
}

Expand Down
56 changes: 0 additions & 56 deletions client/types/addonssyncer.go

This file was deleted.

37 changes: 0 additions & 37 deletions client/types/agentpackagesyncer.go

This file was deleted.

37 changes: 11 additions & 26 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,11 @@ type Callbacks interface {
certificate *protobufs.ConnectionSettings,
) error

// OnAddonsAvailable is called when the server has addons available which are
// OnPackagesAvailable is called when the server has packages available which are
// different from what the agent indicated it has via
// LastServerProvidedAllAddonsHash.
// syncer can be used to initiate syncing the addons from the server.
OnAddonsAvailable(ctx context.Context, addons *protobufs.AddonsAvailable, syncer AddonSyncer) error

// OnAgentPackageAvailable is called when the server has an agent package available
// for the agent.
// syncer can be used to initiate syncing the package from the server.
OnAgentPackageAvailable(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error
// LastServerProvidedAllPackagesHash.
// syncer can be used to initiate syncing the packages from the server.
OnPackagesAvailable(ctx context.Context, packages *protobufs.PackagesAvailable, syncer PackagesSyncer) error

// OnAgentIdentification is called when the server requests changing identification of the agent.
// Agent should be updated with new id and use it for all further communication.
Expand Down Expand Up @@ -172,9 +167,8 @@ type CallbacksStruct struct {
settings *protobufs.ConnectionSettings,
) error

OnAddonsAvailableFunc func(ctx context.Context, addons *protobufs.AddonsAvailable, syncer AddonSyncer) error
OnAgentPackageAvailableFunc func(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error
OnAgentIdentificationFunc func(ctx context.Context, agentId *protobufs.AgentIdentification) error
OnPackagesAvailableFunc func(ctx context.Context, packages *protobufs.PackagesAvailable, syncer PackagesSyncer) error
OnAgentIdentificationFunc func(ctx context.Context, agentId *protobufs.AgentIdentification) error

OnCommandFunc func(command *protobufs.ServerToAgentCommand) error
}
Expand Down Expand Up @@ -243,22 +237,13 @@ func (c CallbacksStruct) OnOtherConnectionSettings(
return nil
}

func (c CallbacksStruct) OnAddonsAvailable(
func (c CallbacksStruct) OnPackagesAvailable(
ctx context.Context,
addons *protobufs.AddonsAvailable,
syncer AddonSyncer,
) error {
if c.OnAddonsAvailableFunc != nil {
return c.OnAddonsAvailableFunc(ctx, addons, syncer)
}
return nil
}

func (c CallbacksStruct) OnAgentPackageAvailable(
addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer,
packages *protobufs.PackagesAvailable,
syncer PackagesSyncer,
) error {
if c.OnAgentPackageAvailableFunc != nil {
return c.OnAgentPackageAvailableFunc(addons, syncer)
if c.OnPackagesAvailableFunc != nil {
return c.OnPackagesAvailableFunc(ctx, packages, syncer)
}
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions client/types/packagessyncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package types

import (
"context"
"io"
)

// PackagesSyncer can be used by the agent to initiate syncing a package from the server.
// The PackagesSyncer instance knows the right context: the particular OpAMPClient and
// the particular PackageAvailable message the OnPackageAvailable callback was called for.
type PackagesSyncer interface {
// Sync the available package from the server to the agent.
// The agent must supply an PackagesStateProvider to let the Sync function
// know what is available locally, what data needs to be sync and how the
// data can be stored locally.
Sync(ctx context.Context, localState PackagesStateProvider) error
}

type PackagesStateProvider interface {
AllPackagesHash() ([]byte, error)

// Packages returns the names of all packages that exist in the agent's local storage.
Packages() ([]string, error)

// PackageHash returns the hash of a local package. packageName is one of the names
// that were returned by Packages().
PackageHash(packageName string) ([]byte, error)

// CreatePackage creates the package locally. If the package existed must return an error.
// If the package did not exist its hash should be set to nil.
CreatePackage(packageName string) error

// FileContentHash returns the content hash of the package file that exists locally.
FileContentHash(packageName string) ([]byte, error)

// UpdateContent must create or update the package content file. The entire content
// of the file must be replaced by the data. The data must be read until
// it returns an EOF. If reading from data fails UpdateContent must abort and return
// an error.
// Content hash must be updated if the data is updated without failure.
// The function must cancel and return an error if the context is cancelled.
UpdateContent(ctx context.Context, packageName string, data io.Reader, contentHash []byte) error

// SetPackageHash must remember the hash for the specified package. Must be returned
// later when PackageHash is called. SetPackageHash is called after all UpsertFile
// and DeleteFile calls complete successfully.
SetPackageHash(packageName string, hash []byte) error

// DeletePackage deletes the package from the agent's local storage.
DeletePackage(packageName string) error

// SetAllPackagesHash must remember the AllPackagesHash. Must be returned
// later when AllPackagesHash is called. SetAllPackagesHash is called after all
// package updates complete successfully.
SetAllPackagesHash(hash []byte) error
}
6 changes: 3 additions & 3 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ func (w *wsClient) runOneCycle(ctx context.Context) {

w.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
if msg.AddonStatuses == nil {
msg.AddonStatuses = &protobufs.AgentAddonStatuses{}
if msg.PackageStatuses == nil {
msg.PackageStatuses = &protobufs.PackageStatuses{}
}
msg.AddonStatuses.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash
msg.PackageStatuses.ServerProvidedAllPackagesHash = w.settings.LastServerProvidedAllPackagesHash
},
)

Expand Down
Loading

0 comments on commit 1882ca8

Please sign in to comment.