Skip to content

Commit

Permalink
update return type of endpointsForContainer for optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Stumpf committed Oct 28, 2021
1 parent 346d3e6 commit 064ab9a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 44 deletions.
58 changes: 28 additions & 30 deletions extension/observer/dockerobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ func (d *dockerObserver) Start(ctx context.Context, host component.Host) error {

d.dClient, err = docker.NewDockerClient(dConfig, d.logger)
if err != nil {
d.logger.Error("Could not create docker client", zap.Error(err))
return err
return fmt.Errorf("could not create docker client: %w", err)
}

// Load initial set of containers
err = d.dClient.LoadContainerList(d.ctx)
if err != nil {
d.logger.Error("Could not load initial list of containers", zap.Error(err))
return err
return fmt.Errorf("could not load initial list of containers: %w", err)
}

d.existingEndpoints = make(map[string][]observer.Endpoint)
Expand Down Expand Up @@ -116,8 +114,8 @@ func (d *dockerObserver) ListAndWatch(listener observer.Notify) {
// creating endpoints for each.
func (d *dockerObserver) emitContainerEndpoints(listener observer.Notify) {
for _, c := range d.dClient.Containers() {
cEndpoints := d.endpointsForContainer(c.ContainerJSON)
d.updateEndpointsByContainerID(listener, c.ContainerJSON.ID, cEndpoints)
endpointsMap := d.endpointsForContainer(c.ContainerJSON)
d.updateEndpointsByContainerID(listener, c.ContainerJSON.ID, endpointsMap)
}
}

Expand All @@ -133,17 +131,12 @@ func (d *dockerObserver) syncContainerList(listener observer.Notify) error {
}

// updateEndpointsByID uses the listener to add / remove / update endpoints by container ID.
// latestEndpoints is the list of latest endpoints for the given container ID.
// latestEndpointsMap is a map of latest endpoints for the given container ID.
// If an endpoint is in the cache but NOT in latestEndpoints, the endpoint will be removed
func (d *dockerObserver) updateEndpointsByContainerID(listener observer.Notify, cid string, latestEndpoints []observer.Endpoint) {
func (d *dockerObserver) updateEndpointsByContainerID(listener observer.Notify, cid string, latestEndpointsMap map[observer.EndpointID]observer.Endpoint) {
var removedEndpoints, addedEndpoints, updatedEndpoints []observer.Endpoint

if len(latestEndpoints) != 0 {
// Create map from ID to endpoint for lookup.
latestEndpointsMap := make(map[observer.EndpointID]bool, len(latestEndpoints))
for _, e := range latestEndpoints {
latestEndpointsMap[e.ID] = true
}
if latestEndpointsMap != nil || len(latestEndpointsMap) != 0 {
// map of EndpointID to endpoint to help with lookups
existingEndpointsMap := make(map[observer.EndpointID]observer.Endpoint)
if endpoints, ok := d.existingEndpoints[cid]; ok {
Expand All @@ -153,17 +146,17 @@ func (d *dockerObserver) updateEndpointsByContainerID(listener observer.Notify,
}

// If the endpoint is present in existingEndpoints but is not
// present in latestEndpoints, then it needs to be removed.
// present in latestEndpointsMap, then it needs to be removed.
for id, e := range existingEndpointsMap {
if !latestEndpointsMap[id] {
if _, ok := latestEndpointsMap[id]; !ok {
removedEndpoints = append(removedEndpoints, e)
}
}

// if the endpoint s present in latestEndpoints, check if it exists
// if the endpoint is present in latestEndpointsMap, check if it exists
// already in existingEndpoints.
for _, e := range latestEndpoints {
// If it does not exist alreaedy, it is a new endpoint. Add it.
for _, e := range latestEndpointsMap {
// If it does not exist already, it is a new endpoint. Add it.
if existingEndpoint, ok := existingEndpointsMap[e.ID]; !ok {
addedEndpoints = append(addedEndpoints, e)
} else {
Expand All @@ -176,37 +169,38 @@ func (d *dockerObserver) updateEndpointsByContainerID(listener observer.Notify,
}
}

// reset endpoints for this container
d.existingEndpoints[cid] = nil
// set the current known endpoints to the latest endpoints
d.existingEndpoints[cid] = latestEndpoints
for _, e := range latestEndpointsMap {
d.existingEndpoints[cid] = append(d.existingEndpoints[cid], e)
}
} else {
// if latestEndpoints is nil, we are removing all endpoints for the container
// if latestEndpointsMap is nil, we are removing all endpoints for the container
removedEndpoints = append(removedEndpoints, d.existingEndpoints[cid]...)
delete(d.existingEndpoints, cid)
}

if len(removedEndpoints) > 0 {
d.logger.Info("removing endpoints")
listener.OnRemove(removedEndpoints)
}

if len(addedEndpoints) > 0 {
d.logger.Info("adding endpoints")
listener.OnAdd(addedEndpoints)
}

if len(updatedEndpoints) > 0 {
d.logger.Info("updating endpoints")
listener.OnChange(updatedEndpoints)
}
}

// endpointsForContainer generates a list of observer.Endpoint given a Docker ContainerJSON.
// This function will only generate endpoints if a container is in the Running state and not Paused.
func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) []observer.Endpoint {
cEndpoints := make([]observer.Endpoint, 0)
func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) map[observer.EndpointID]observer.Endpoint {
endpointsMap := make(map[observer.EndpointID]observer.Endpoint)

if !c.State.Running || c.State.Running && c.State.Paused {
return cEndpoints
return endpointsMap
}

knownPorts := map[nat.Port]bool{}
Expand All @@ -221,15 +215,19 @@ func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) []observ
if endpoint == nil {
continue
}
cEndpoints = append(cEndpoints, *endpoint)
endpointsMap[endpoint.ID] = *endpoint
}

if len(endpointsMap) == 0 {
return nil
}

for _, e := range cEndpoints {
for _, e := range endpointsMap {
s, _ := json.MarshalIndent(e, "", "\t")
d.logger.Debug("Discovered Docker container endpoint", zap.Any("endpoint", s))
}

return cEndpoints
return endpointsMap
}

// endpointForPort creates an observer.Endpoint for a given port that is exposed in a Docker container.
Expand Down
20 changes: 10 additions & 10 deletions extension/observer/dockerobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestCollectEndpointsDefaultConfig(t *testing.T) {
c := containerJSON(t)
cEndpoints := obvs.endpointsForContainer(&c)

want := []observer.Endpoint{
{
want := map[observer.EndpointID]observer.Endpoint{
"babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080": {
ID: "babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080",
Target: "172.17.0.2:80",
Details: &observer.Container{
Expand Down Expand Up @@ -127,8 +127,8 @@ func TestCollectEndpointsAllConfigSettings(t *testing.T) {
c := containerJSON(t)
cEndpoints := obvs.endpointsForContainer(&c)

want := []observer.Endpoint{
{
want := map[observer.EndpointID]observer.Endpoint{
"babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080": {
ID: "babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080",
Target: "127.0.0.1:8080",
Details: &observer.Container{
Expand Down Expand Up @@ -174,8 +174,8 @@ func TestCollectEndpointsUseHostnameIfPresent(t *testing.T) {
c := containerJSON(t)
cEndpoints := obvs.endpointsForContainer(&c)

want := []observer.Endpoint{
{
want := map[observer.EndpointID]observer.Endpoint{
"babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080": {
ID: "babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080",
Target: "babc5a6d7af2:80",
Details: &observer.Container{
Expand Down Expand Up @@ -221,8 +221,8 @@ func TestCollectEndpointsUseHostBindings(t *testing.T) {
c := containerJSON(t)
cEndpoints := obvs.endpointsForContainer(&c)

want := []observer.Endpoint{
{
want := map[observer.EndpointID]observer.Endpoint{
"babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080": {
ID: "babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080",
Target: "127.0.0.1:8080",
Details: &observer.Container{
Expand Down Expand Up @@ -268,8 +268,8 @@ func TestCollectEndpointsIgnoreNonHostBindings(t *testing.T) {
c := containerJSON(t)
cEndpoints := obvs.endpointsForContainer(&c)

want := []observer.Endpoint{
{
want := map[observer.EndpointID]observer.Endpoint{
"babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080": {
ID: "babc5a6d7af2a48e7f52e1da26047024dcf98b737e754c9c3459bb84d1e4f80c:8080",
Target: "172.17.0.2:80",
Details: &observer.Container{
Expand Down
5 changes: 5 additions & 0 deletions extension/observer/dockerobserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/knadh/koanf v1.3.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand All @@ -44,7 +45,11 @@ require (
go.opentelemetry.io/otel/trace v1.0.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71 // indirect
google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 // indirect
google.golang.org/grpc v1.41.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
Expand Down
20 changes: 16 additions & 4 deletions extension/observer/dockerobserver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 064ab9a

Please sign in to comment.