Skip to content

Commit

Permalink
chore: different periods between send and receive (#1563)
Browse files Browse the repository at this point in the history
## Description

We lowered the debounce period (to one second) in which our Store
Subscribers **receive** updates from the Watch on the `PeprStore`
resource, meaning they can react to changes faster.

The same debounce period was responsible for **sending** JSON patches to
the `PeprStore` resource. The cache that holds these updates is updated
and the items that were sent are deleted after the JSON patch succeeds.
This meant that if we are sending events every second, and the
kube-apiserver does not process the patch in a second, then the same
events will be sent over multiple intervals. It is unclear if this would
make the data incorrect as it is idempotent but it certainly introduces
pressure on the network which could create a fallout of other network
created problems.

This PR:
- Creates two different debounce periods, one for send and one for
receive
- Sends a reason for the reject in `SetItemAndWait/RemoveItemAndWait`
- Cleans up the timeouts in `SetItemAndWait/RemoveItemAndWait`
- Adds an array item to the JSON patch to the Store to ensure a watch
event occurs to resolve promises in Set/RemoveItemAndWait


e2e: defenseunicorns/pepr-excellent-examples#204
## Related Issue

Fixes #1561
<!-- or -->
Relates to #

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Other (security config, docs update, etc)

## Checklist before merging
- [x] Unit,
[Journey](https://github.com/defenseunicorns/pepr/tree/main/journey),
[E2E Tests](https://github.com/defenseunicorns/pepr-excellent-examples),
[docs](https://github.com/defenseunicorns/pepr/tree/main/docs),
[adr](https://github.com/defenseunicorns/pepr/tree/main/adr) added or
updated as needed
- [x] [Contributor Guide
Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request)
followed

---------

Signed-off-by: Case Wylie <[email protected]>
  • Loading branch information
cmwylie19 authored Dec 12, 2024
1 parent 65ea7d1 commit 6a26b80
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 33 deletions.
19 changes: 16 additions & 3 deletions src/lib/controller/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { DataOp, DataSender, DataStore, Storage } from "../storage";
import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache";

const namespace = "pepr-system";
export const debounceBackoff = 1000;
const debounceBackoffReceive = 1000;
const debounceBackoffSend = 4000;

export class StoreController {
#name: string;
Expand Down Expand Up @@ -68,6 +69,15 @@ export class StoreController {

#migrateAndSetupWatch = async (store: Store): Promise<void> => {
Log.debug(redactedStore(store), "Pepr Store migration");
// Add cacheID label to store
await K8s(Store, { namespace, name: this.#name }).Patch([
{
op: "add",
path: "/metadata/labels/pepr.dev-cacheID",
value: `${Date.now()}`,
},
]);

const data: DataStore = store.data || {};
let storeCache: Record<string, Operation> = {};

Expand Down Expand Up @@ -134,7 +144,7 @@ export class StoreController {

// Debounce the update to 1 second to avoid multiple rapid calls
clearTimeout(this.#sendDebounce);
this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoff);
this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoffReceive);
};

#send = (capabilityName: string): DataSender => {
Expand All @@ -151,7 +161,7 @@ export class StoreController {
Log.debug(redactedPatch(storeCache), "Sending updates to Pepr store");
void sendUpdatesAndFlushCache(storeCache, namespace, this.#name);
}
}, debounceBackoff);
}, debounceBackoffSend);

return sender;
};
Expand All @@ -165,6 +175,9 @@ export class StoreController {
metadata: {
name: this.#name,
namespace,
labels: {
"pepr.dev-cacheID": `${Date.now()}`,
},
},
data: {
// JSON Patch will die if the data is empty, so we need to add a placeholder
Expand Down
19 changes: 18 additions & 1 deletion src/lib/controller/storeCache.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, jest, afterEach } from "@jest/globals";
import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache";
import { fillStoreCache, sendUpdatesAndFlushCache, updateCacheID } from "./storeCache";
import { Operation } from "fast-json-patch";
import { GenericClass, K8s, KubernetesObject } from "kubernetes-fluent-client";
import { K8sInit } from "kubernetes-fluent-client/dist/fluent/types";
Expand Down Expand Up @@ -100,3 +100,20 @@ describe("sendCache", () => {
});
});
});

describe("updateCacheId", () => {
it("should update the metadata label of the cacheID in the payload array of patches", () => {
const patches: Operation[] = [
{
op: "add",
path: "/data/hello-pepr-v2-a",
value: "a",
},
];

const updatedPatches = updateCacheID(patches);
expect(updatedPatches.length).toBe(2);
expect(updatedPatches[1].op).toBe("replace");
expect(updatedPatches[1].path).toBe("/metadata/labels/pepr.dev-cacheID");
});
});
11 changes: 10 additions & 1 deletion src/lib/controller/storeCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const sendUpdatesAndFlushCache = async (cache: Record<string, Operation>,

try {
if (payload.length > 0) {
await K8s(Store, { namespace, name }).Patch(payload); // Send patch to cluster
await K8s(Store, { namespace, name }).Patch(updateCacheID(payload)); // Send patch to cluster
Object.keys(cache).forEach(key => delete cache[key]);
}
} catch (err) {
Expand Down Expand Up @@ -61,3 +61,12 @@ export const fillStoreCache = (
}
return cache;
};

export function updateCacheID(payload: Operation[]): Operation[] {
payload.push({
op: "replace",
path: "/metadata/labels/pepr.dev-cacheID",
value: `${Date.now()}`,
});
return payload;
}
8 changes: 4 additions & 4 deletions src/lib/schedule.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ export class MockStorage {
this.storage[key] = value;
}

setItemAndWait(key: string, value: string): Promise<void> {
setItemAndWait(key: string, value: string): Promise<string> {
return new Promise(resolve => {
this.storage[key] = value;
resolve();
resolve("ok");
});
}

removeItem(key: string): void {
delete this.storage[key];
}

removeItemAndWait(key: string): Promise<void> {
removeItemAndWait(key: string): Promise<string> {
return new Promise(resolve => {
delete this.storage[key];
resolve();
resolve("ok");
});
}

Expand Down
57 changes: 33 additions & 24 deletions src/lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ export type Unsubscribe = () => void;
const MAX_WAIT_TIME = 15000;
const STORE_VERSION_PREFIX = "v2";

interface WaitRecord {
timeout?: ReturnType<typeof setTimeout>;
unsubscribe?: () => void;
}

export function v2StoreKey(key: string): string {
return `${STORE_VERSION_PREFIX}-${pointer.escape(key)}`;
}
Expand Down Expand Up @@ -58,13 +63,13 @@ export interface PeprStore {
* Sets the value of the pair identified by key to value, creating a new key/value pair if none existed for key previously.
* Resolves when the key/value show up in the store.
*/
setItemAndWait(key: string, value: string): Promise<void>;
setItemAndWait(key: string, value: string): Promise<string>;

/**
* Remove the value of the key.
* Resolves when the key does not show up in the store.
*/
removeItemAndWait(key: string): Promise<void>;
removeItemAndWait(key: string): Promise<string>;
}

/**
Expand Down Expand Up @@ -128,22 +133,24 @@ export class Storage implements PeprStore {
* @param value - The value of the key
* @returns
*/
setItemAndWait = (key: string, value: string): Promise<void> => {
setItemAndWait = (key: string, value: string): Promise<string> => {
this.#dispatchUpdate("add", [v2StoreKey(key)], value);
const record: WaitRecord = {};

return new Promise<void>((resolve, reject) => {
const unsubscribe = this.subscribe(data => {
return new Promise<string>((resolve, reject) => {
// If promise has not resolved before MAX_WAIT_TIME reject
record.timeout = setTimeout(() => {
record.unsubscribe!();
return reject(`MAX_WAIT_TIME elapsed: Key ${key} not seen in ${MAX_WAIT_TIME / 1000}s`);
}, MAX_WAIT_TIME);

record.unsubscribe = this.subscribe(data => {
if (data[`${v2UnescapedStoreKey(key)}`] === value) {
unsubscribe();
resolve();
record.unsubscribe!();
clearTimeout(record.timeout);
resolve("ok");
}
});

// If promise has not resolved before MAX_WAIT_TIME reject
setTimeout(() => {
unsubscribe();
return reject();
}, MAX_WAIT_TIME);
});
};

Expand All @@ -154,21 +161,23 @@ export class Storage implements PeprStore {
* @param key - The key to add into the store
* @returns
*/
removeItemAndWait = (key: string): Promise<void> => {
removeItemAndWait = (key: string): Promise<string> => {
this.#dispatchUpdate("remove", [v2StoreKey(key)]);
return new Promise<void>((resolve, reject) => {
const unsubscribe = this.subscribe(data => {
const record: WaitRecord = {};
return new Promise<string>((resolve, reject) => {
// If promise has not resolved before MAX_WAIT_TIME reject
record.timeout = setTimeout(() => {
record.unsubscribe!();
return reject(`MAX_WAIT_TIME elapsed: Key ${key} still seen after ${MAX_WAIT_TIME / 1000}s`);
}, MAX_WAIT_TIME);

record.unsubscribe = this.subscribe(data => {
if (!Object.hasOwn(data, `${v2UnescapedStoreKey(key)}`)) {
unsubscribe();
resolve();
record.unsubscribe!();
clearTimeout(record.timeout);
resolve("ok");
}
});

// If promise has not resolved before MAX_WAIT_TIME reject
setTimeout(() => {
unsubscribe();
return reject();
}, MAX_WAIT_TIME);
});
};

Expand Down

0 comments on commit 6a26b80

Please sign in to comment.