From 6a26b80c9ea0ab332b83edef643851924e3b6df8 Mon Sep 17 00:00:00 2001 From: Case Wylie Date: Thu, 12 Dec 2024 12:22:28 -0500 Subject: [PATCH] chore: different periods between send and receive (#1563) ## Description We lowered the debounce period (to one second) in which our Store Subscribers **receive** updates from the Watch on the `PeprStore` resource, meaning they can react to changes faster. The same debounce period was responsible for **sending** JSON patches to the `PeprStore` resource. The cache that holds these updates is updated and the items that were sent are deleted after the JSON patch succeeds. This meant that if we are sending events every second, and the kube-apiserver does not process the patch in a second, then the same events will be sent over multiple intervals. It is unclear if this would make the data incorrect as it is idempotent but it certainly introduces pressure on the network which could create a fallout of other network created problems. This PR: - Creates two different debounce periods, one for send and one for receive - Sends a reason for the reject in `SetItemAndWait/RemoveItemAndWait` - Cleans up the timeouts in `SetItemAndWait/RemoveItemAndWait` - Adds an array item to the JSON patch to the Store to ensure a watch event occurs to resolve promises in Set/RemoveItemAndWait e2e: https://github.com/defenseunicorns/pepr-excellent-examples/pull/204 ## Related Issue Fixes #1561 Relates to # ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [x] Other (security config, docs update, etc) ## Checklist before merging - [x] Unit, [Journey](https://github.com/defenseunicorns/pepr/tree/main/journey), [E2E Tests](https://github.com/defenseunicorns/pepr-excellent-examples), [docs](https://github.com/defenseunicorns/pepr/tree/main/docs), [adr](https://github.com/defenseunicorns/pepr/tree/main/adr) added or updated as needed - [x] [Contributor Guide Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request) followed --------- Signed-off-by: Case Wylie --- src/lib/controller/store.ts | 19 +++++++-- src/lib/controller/storeCache.test.ts | 19 ++++++++- src/lib/controller/storeCache.ts | 11 +++++- src/lib/schedule.test.ts | 8 ++-- src/lib/storage.ts | 57 ++++++++++++++++----------- 5 files changed, 81 insertions(+), 33 deletions(-) diff --git a/src/lib/controller/store.ts b/src/lib/controller/store.ts index 159ff8a7e..2289da8ce 100644 --- a/src/lib/controller/store.ts +++ b/src/lib/controller/store.ts @@ -12,7 +12,8 @@ import { DataOp, DataSender, DataStore, Storage } from "../storage"; import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache"; const namespace = "pepr-system"; -export const debounceBackoff = 1000; +const debounceBackoffReceive = 1000; +const debounceBackoffSend = 4000; export class StoreController { #name: string; @@ -68,6 +69,15 @@ export class StoreController { #migrateAndSetupWatch = async (store: Store): Promise => { Log.debug(redactedStore(store), "Pepr Store migration"); + // Add cacheID label to store + await K8s(Store, { namespace, name: this.#name }).Patch([ + { + op: "add", + path: "/metadata/labels/pepr.dev-cacheID", + value: `${Date.now()}`, + }, + ]); + const data: DataStore = store.data || {}; let storeCache: Record = {}; @@ -134,7 +144,7 @@ export class StoreController { // Debounce the update to 1 second to avoid multiple rapid calls clearTimeout(this.#sendDebounce); - this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoff); + this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoffReceive); }; #send = (capabilityName: string): DataSender => { @@ -151,7 +161,7 @@ export class StoreController { Log.debug(redactedPatch(storeCache), "Sending updates to Pepr store"); void sendUpdatesAndFlushCache(storeCache, namespace, this.#name); } - }, debounceBackoff); + }, debounceBackoffSend); return sender; }; @@ -165,6 +175,9 @@ export class StoreController { metadata: { name: this.#name, namespace, + labels: { + "pepr.dev-cacheID": `${Date.now()}`, + }, }, data: { // JSON Patch will die if the data is empty, so we need to add a placeholder diff --git a/src/lib/controller/storeCache.test.ts b/src/lib/controller/storeCache.test.ts index 5e4ddb96d..3594bea55 100644 --- a/src/lib/controller/storeCache.test.ts +++ b/src/lib/controller/storeCache.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, jest, afterEach } from "@jest/globals"; -import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache"; +import { fillStoreCache, sendUpdatesAndFlushCache, updateCacheID } from "./storeCache"; import { Operation } from "fast-json-patch"; import { GenericClass, K8s, KubernetesObject } from "kubernetes-fluent-client"; import { K8sInit } from "kubernetes-fluent-client/dist/fluent/types"; @@ -100,3 +100,20 @@ describe("sendCache", () => { }); }); }); + +describe("updateCacheId", () => { + it("should update the metadata label of the cacheID in the payload array of patches", () => { + const patches: Operation[] = [ + { + op: "add", + path: "/data/hello-pepr-v2-a", + value: "a", + }, + ]; + + const updatedPatches = updateCacheID(patches); + expect(updatedPatches.length).toBe(2); + expect(updatedPatches[1].op).toBe("replace"); + expect(updatedPatches[1].path).toBe("/metadata/labels/pepr.dev-cacheID"); + }); +}); diff --git a/src/lib/controller/storeCache.ts b/src/lib/controller/storeCache.ts index 50340f107..fb27ea886 100644 --- a/src/lib/controller/storeCache.ts +++ b/src/lib/controller/storeCache.ts @@ -11,7 +11,7 @@ export const sendUpdatesAndFlushCache = async (cache: Record, try { if (payload.length > 0) { - await K8s(Store, { namespace, name }).Patch(payload); // Send patch to cluster + await K8s(Store, { namespace, name }).Patch(updateCacheID(payload)); // Send patch to cluster Object.keys(cache).forEach(key => delete cache[key]); } } catch (err) { @@ -61,3 +61,12 @@ export const fillStoreCache = ( } return cache; }; + +export function updateCacheID(payload: Operation[]): Operation[] { + payload.push({ + op: "replace", + path: "/metadata/labels/pepr.dev-cacheID", + value: `${Date.now()}`, + }); + return payload; +} diff --git a/src/lib/schedule.test.ts b/src/lib/schedule.test.ts index cafdb0179..ca84784a4 100644 --- a/src/lib/schedule.test.ts +++ b/src/lib/schedule.test.ts @@ -20,10 +20,10 @@ export class MockStorage { this.storage[key] = value; } - setItemAndWait(key: string, value: string): Promise { + setItemAndWait(key: string, value: string): Promise { return new Promise(resolve => { this.storage[key] = value; - resolve(); + resolve("ok"); }); } @@ -31,10 +31,10 @@ export class MockStorage { delete this.storage[key]; } - removeItemAndWait(key: string): Promise { + removeItemAndWait(key: string): Promise { return new Promise(resolve => { delete this.storage[key]; - resolve(); + resolve("ok"); }); } diff --git a/src/lib/storage.ts b/src/lib/storage.ts index 2f56b2f12..5e26f4a8b 100644 --- a/src/lib/storage.ts +++ b/src/lib/storage.ts @@ -12,6 +12,11 @@ export type Unsubscribe = () => void; const MAX_WAIT_TIME = 15000; const STORE_VERSION_PREFIX = "v2"; +interface WaitRecord { + timeout?: ReturnType; + unsubscribe?: () => void; +} + export function v2StoreKey(key: string): string { return `${STORE_VERSION_PREFIX}-${pointer.escape(key)}`; } @@ -58,13 +63,13 @@ export interface PeprStore { * Sets the value of the pair identified by key to value, creating a new key/value pair if none existed for key previously. * Resolves when the key/value show up in the store. */ - setItemAndWait(key: string, value: string): Promise; + setItemAndWait(key: string, value: string): Promise; /** * Remove the value of the key. * Resolves when the key does not show up in the store. */ - removeItemAndWait(key: string): Promise; + removeItemAndWait(key: string): Promise; } /** @@ -128,22 +133,24 @@ export class Storage implements PeprStore { * @param value - The value of the key * @returns */ - setItemAndWait = (key: string, value: string): Promise => { + setItemAndWait = (key: string, value: string): Promise => { this.#dispatchUpdate("add", [v2StoreKey(key)], value); + const record: WaitRecord = {}; - return new Promise((resolve, reject) => { - const unsubscribe = this.subscribe(data => { + return new Promise((resolve, reject) => { + // If promise has not resolved before MAX_WAIT_TIME reject + record.timeout = setTimeout(() => { + record.unsubscribe!(); + return reject(`MAX_WAIT_TIME elapsed: Key ${key} not seen in ${MAX_WAIT_TIME / 1000}s`); + }, MAX_WAIT_TIME); + + record.unsubscribe = this.subscribe(data => { if (data[`${v2UnescapedStoreKey(key)}`] === value) { - unsubscribe(); - resolve(); + record.unsubscribe!(); + clearTimeout(record.timeout); + resolve("ok"); } }); - - // If promise has not resolved before MAX_WAIT_TIME reject - setTimeout(() => { - unsubscribe(); - return reject(); - }, MAX_WAIT_TIME); }); }; @@ -154,21 +161,23 @@ export class Storage implements PeprStore { * @param key - The key to add into the store * @returns */ - removeItemAndWait = (key: string): Promise => { + removeItemAndWait = (key: string): Promise => { this.#dispatchUpdate("remove", [v2StoreKey(key)]); - return new Promise((resolve, reject) => { - const unsubscribe = this.subscribe(data => { + const record: WaitRecord = {}; + return new Promise((resolve, reject) => { + // If promise has not resolved before MAX_WAIT_TIME reject + record.timeout = setTimeout(() => { + record.unsubscribe!(); + return reject(`MAX_WAIT_TIME elapsed: Key ${key} still seen after ${MAX_WAIT_TIME / 1000}s`); + }, MAX_WAIT_TIME); + + record.unsubscribe = this.subscribe(data => { if (!Object.hasOwn(data, `${v2UnescapedStoreKey(key)}`)) { - unsubscribe(); - resolve(); + record.unsubscribe!(); + clearTimeout(record.timeout); + resolve("ok"); } }); - - // If promise has not resolved before MAX_WAIT_TIME reject - setTimeout(() => { - unsubscribe(); - return reject(); - }, MAX_WAIT_TIME); }); };