Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement package syncing #76

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 253 additions & 1 deletion client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -391,6 +392,7 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
eventually(t, func() bool { return atomic.LoadInt64(&receivedDetails) == 1 })

// close the Agent connection. expect it to reconnect and send details again.
require.NotNil(t, client.conn)
err := client.conn.Close()
assert.NoError(t, err)

Expand Down Expand Up @@ -438,7 +440,7 @@ func TestSetEffectiveConfig(t *testing.T) {
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))
require.NoError(t, client.Start(context.Background(), settings))

// Verify config is delivered.
eventually(
Expand Down Expand Up @@ -966,3 +968,253 @@ func TestRemoteConfigUpdate(t *testing.T) {
})
}
}

type packageTestCase struct {
name string
errorOnCallback bool
available *protobufs.PackagesAvailable
expectedStatus *protobufs.PackageStatuses
expectedFileContent map[string][]byte
}

const packageUpdateErrorMsg = "cannot update packages"

func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

localPackageState := internal.NewInMemPackagesStore()

var syncerDoneCh <-chan struct{}

// Prepare a callback that returns either success or failure.
onPackagesAvailable := func(ctx context.Context, packages *protobufs.PackagesAvailable, syncer types.PackagesSyncer) error {
if testCase.errorOnCallback {
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
return errors.New(packageUpdateErrorMsg)
} else {
syncerDoneCh = syncer.Done()
err := syncer.Sync(ctx)
require.NoError(t, err)
return nil
}
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnPackagesAvailableFunc: onPackagesAvailable,
},
PackagesStateProvider: localPackageState,
}
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Send the packages to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
PackagesAvailable: testCase.available,
}
})

// The Agent will try to install the packages and will send the status
// report about it back to the Server.

var lastStatusHash []byte

// ---> Server
// Wait for the expected package statuses to be received.
srv.EventuallyExpect("full PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
expectedStatusReceived := false

status := msg.PackageStatuses
require.NotNil(t, status)
assert.EqualValues(t, testCase.expectedStatus.ServerProvidedAllPackagesHash, status.ServerProvidedAllPackagesHash)
lastStatusHash = status.Hash

// Verify individual package statuses.
for name, pkgExpected := range testCase.expectedStatus.Packages {
pkgStatus := status.Packages[name]
if pkgStatus == nil {
// Package status not yet included in the report.
continue
}
switch pkgStatus.Status {
case protobufs.PackageStatus_InstallFailed:
assert.Contains(t, pkgStatus.ErrorMessage, pkgExpected.ErrorMessage)

case protobufs.PackageStatus_Installed:
assert.EqualValues(t, pkgExpected.AgentHasHash, pkgStatus.AgentHasHash)
assert.EqualValues(t, pkgExpected.AgentHasVersion, pkgStatus.AgentHasVersion)
assert.Empty(t, pkgStatus.ErrorMessage)
default:
assert.Empty(t, pkgStatus.ErrorMessage)
}
assert.EqualValues(t, pkgExpected.ServerOfferedHash, pkgStatus.ServerOfferedHash)
assert.EqualValues(t, pkgExpected.ServerOfferedVersion, pkgStatus.ServerOfferedVersion)

if pkgStatus.Status == pkgExpected.Status {
expectedStatusReceived = true
assert.Len(t, status.Packages, len(testCase.available.Packages))
}
}
assert.NotNil(t, status.Hash)

return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}, expectedStatusReceived
})

if syncerDoneCh != nil {
// Wait until all syncing is done.
<-syncerDoneCh

for pkgName, receivedContent := range localPackageState.GetContent() {
expectedContent := testCase.expectedFileContent[pkgName]
assert.EqualValues(t, expectedContent, receivedContent)
}
}

// Client --->
// Trigger another status report by setting AgentDescription.
_ = client.SetAgentDescription(client.AgentDescription())

// ---> Server
srv.EventuallyExpect("compressed PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
// Ensure that compressed status is received.
status := msg.PackageStatuses
require.NotNil(t, status)
compressedReceived := status.ServerProvidedAllPackagesHash == nil
if compressedReceived {
assert.Nil(t, status.ServerProvidedAllPackagesHash)
assert.Nil(t, status.Packages)
}
assert.NotNil(t, status.Hash)
assert.Equal(t, lastStatusHash, status.Hash)

response := &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}

if compressedReceived {
// Ask for full report again.
response.Flags = protobufs.ServerToAgent_ReportPackageStatuses
} else {
// Keep triggering status report by setting AgentDescription
// until the compressed PackageStatuses arrives.
_ = client.SetAgentDescription(client.AgentDescription())
}

return response, compressedReceived
})

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

// Downloadable package file constants.
const packageFileURL = "/validfile.pkg"

var packageFileContent = []byte("Package File Content")

func createDownloadSrv(t *testing.T) *httptest.Server {
m := http.NewServeMux()
m.HandleFunc(packageFileURL,
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := w.Write(packageFileContent)
assert.NoError(t, err)
},
)

srv := httptest.NewServer(m)

u, err := url.Parse(srv.URL)
if err != nil {
t.Fatal(err)
}
endpoint := u.Host
testhelpers.WaitForEndpoint(endpoint)

return srv
}

func createPackageTestCase(name string, downloadSrv *httptest.Server) packageTestCase {
return packageTestCase{
name: name,
errorOnCallback: false,
available: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageAvailable_TopLevelPackage,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: downloadSrv.URL + packageFileURL,
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},

expectedStatus: &protobufs.PackageStatuses{
Packages: map[string]*protobufs.PackageStatus{
"package1": {
Name: "package1",
AgentHasVersion: "1.0.0",
AgentHasHash: []byte{1, 2, 3},
ServerOfferedVersion: "1.0.0",
ServerOfferedHash: []byte{1, 2, 3},
Status: protobufs.PackageStatus_Installed,
ErrorMessage: "",
},
},
ServerProvidedAllPackagesHash: []byte{1, 2, 3, 4, 5},
},

expectedFileContent: map[string][]byte{
"package1": packageFileContent,
},
}
}

func TestUpdatePackages(t *testing.T) {

downloadSrv := createDownloadSrv(t)
defer downloadSrv.Close()

// A success case.
var tests []packageTestCase
tests = append(tests, createPackageTestCase("success", downloadSrv))

// A case when downloading the file fails because the URL is incorrect.
notFound := createPackageTestCase("downloadable file not found", downloadSrv)
notFound.available.Packages["package1"].File.DownloadUrl = downloadSrv.URL + "/notfound"
notFound.expectedStatus.Packages["package1"].Status = protobufs.PackageStatus_InstallFailed
notFound.expectedStatus.Packages["package1"].ErrorMessage = "cannot download"
tests = append(tests, notFound)

// A case when OnPackagesAvailable callback returns an error.
errorOnCallback := createPackageTestCase("error on callback", downloadSrv)
errorOnCallback.expectedStatus.Packages["package1"].Status = protobufs.PackageStatus_InstallFailed
errorOnCallback.expectedStatus.Packages["package1"].ErrorMessage = packageUpdateErrorMsg
errorOnCallback.errorOnCallback = true
tests = append(tests, errorOnCallback)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
verifyUpdatePackages(t, test)
})
}
}
8 changes: 7 additions & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,11 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
// to send.
c.sender.Run(ctx, c.opAMPServerURL, c.common.Callbacks, &c.common.ClientSyncedState)
c.sender.Run(
ctx,
c.opAMPServerURL,
c.common.Callbacks,
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
)
}
50 changes: 38 additions & 12 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"google.golang.org/protobuf/proto"
)

var (
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
errRemoteConfigStatusMissing = errors.New("RemoteConfigStatus is not set")
errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand All @@ -28,6 +29,9 @@ type ClientCommon struct {
// Client state storage. This is needed if the Server asks to report the state.
ClientSyncedState ClientSyncedState

// PackagesStateProvider provides access to the local state of packages.
PackagesStateProvider types.PackagesStateProvider

// The transport-specific sender.
sender Sender

Expand Down Expand Up @@ -58,6 +62,7 @@ func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSetti
return ErrAgentDescriptionMissing
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
settings.RemoteConfigStatus = &protobufs.RemoteConfigStatus{
Expand All @@ -69,6 +74,27 @@ func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSetti
return err
}

// Prepare package statuses.
c.PackagesStateProvider = settings.PackagesStateProvider
var packageStatuses *protobufs.PackageStatuses
if settings.PackagesStateProvider != nil {
// Set package status from the value previously saved in the PackagesStateProvider.
var err error
packageStatuses, err = settings.PackagesStateProvider.LastReportedStatuses()
if err != nil {
return err
}
}

if packageStatuses == nil {
// PackageStatuses is not provided. Start with empty.
packageStatuses = &protobufs.PackageStatuses{}
}
if err := c.ClientSyncedState.SetPackageStatuses(packageStatuses); err != nil {
return err
}

// Prepare callbacks.
c.Callbacks = settings.Callbacks
if c.Callbacks == nil {
// Make sure it is always safe to call Callbacks.
Expand Down Expand Up @@ -153,24 +179,24 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.StatusReport = &protobufs.StatusReport{}
}
msg.StatusReport.AgentDescription = c.ClientSyncedState.AgentDescription()

msg.StatusReport.EffectiveConfig = cfg

msg.StatusReport.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()

if msg.PackageStatuses == nil {
msg.PackageStatuses = &protobufs.PackageStatuses{}
if c.PackagesStateProvider != nil {
// We have a state provider, so package related capabilities can work.
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_AcceptsPackages
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_ReportsPackageStatuses
}

// TODO: set PackageStatuses.ServerProvidedAllPackagesHash field and
// handle the Hashes for PackageStatuses properly.
},
)
return nil
}

// AgentDescription returns the current state of the AgentDescription.
func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription {
return c.ClientSyncedState.AgentDescription()
// Return a cloned copy to allow caller to do whatever they want with the result.
return proto.Clone(c.ClientSyncedState.AgentDescription()).(*protobufs.AgentDescription)
}

// SetAgentDescription sends a status update to the Server with the new AgentDescription
Expand All @@ -182,7 +208,7 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return err
}
c.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.AgentDescription = descr
statusReport.AgentDescription = c.ClientSyncedState.AgentDescription()
})
c.sender.ScheduleSend()
return nil
Expand Down
Loading