From e2d69a0bcd4ed67b488d2a5079a1d2fbeddf97b7 Mon Sep 17 00:00:00 2001 From: jhagas Date: Fri, 19 Jul 2024 11:08:29 +0700 Subject: [PATCH] Multiple table support --- .../realtime-postgresChanges.ino | 24 +++++-- src/ESP-Supabase-Realtime.h | 21 ++++-- src/Realtime.cpp | 66 ++++++++++--------- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/examples/realtime-postgresChanges/realtime-postgresChanges.ino b/examples/realtime-postgresChanges/realtime-postgresChanges.ino index 5f58737..5adaa9f 100644 --- a/examples/realtime-postgresChanges/realtime-postgresChanges.ino +++ b/examples/realtime-postgresChanges/realtime-postgresChanges.ino @@ -10,14 +10,20 @@ SupabaseRealtime realtime; -void DevicesTableHandler(String result) +void HandleChanges(String result) { JsonDocument doc; deserializeJson(doc, result); // Example of what you can do with the result - String state = doc["state"]; - Serial.println(state); + String tableName = doc["table"]; + String event = doc["type"]; + String changes = doc["record"]; + + Serial.print(tableName); + Serial.print(" : "); + Serial.println(event); + Serial.println(changes); } void setup() @@ -32,16 +38,20 @@ void setup() } Serial.println("\nConnected to WiFi"); - realtime.begin("https://project.supabase.co", "apikey"); + realtime.begin("https://project.supabase.co", "apikey", HandleChanges); realtime.login_email("email", "password"); // Only if you activate RLS in your Supabase Postgres Table // Parameter 1 : Table name // Parameter 2 : Event type ("*" | "INSERT" | "UPDATE" | "DELETE") - // Parameter 3 : Filter + // Parameter 3 : Your Supabase Table Postgres Schema + // Parameter 4 : Filter // Please read : https://supabase.com/docs/guides/realtime/postgres-changes?queryGroups=language&language=js#available-filters // empty string if you don't want to filter the result - // Parameter 4 : Callback function, how to handle the result (message) - realtime.listen("table", "*", "", DevicesTableHandler); + realtime.addChangesListener("table1", "INSERT", "public", "id=eq.0"); + // You can add multiple table listeners + realtime.addChangesListener("table2", "*", "public", ""); + + realtime.listen(); } void loop() diff --git a/src/ESP-Supabase-Realtime.h b/src/ESP-Supabase-Realtime.h index b7366a8..37f43e7 100644 --- a/src/ESP-Supabase-Realtime.h +++ b/src/ESP-Supabase-Realtime.h @@ -26,26 +26,35 @@ class SupabaseRealtime String password; String data; String loginMethod; - String USER_TOKEN = ""; bool useAuth; int _login_process(); unsigned int authTimeout = 0; unsigned long loginTime; + String configAUTH; - const char *config = "{\"event\":\"phx_join\",\"topic\":\"realtime:ESP\",\"payload\":{\"config\":{\"postgres_changes\":[{\"event\":\"abcdfeghijklmbopq\",\"schema\":\"public\",\"table\":\"abcdfeghijklmbopq\"}]}},\"ref\":\"sentRef\"}"; + // Initial config + const char *config = "{\"event\":\"phx_join\",\"topic\":\"realtime:ESP\",\"payload\":{\"config\":{\"postgres_changes\":[]}},\"ref\":\"sentRef\"}"; + JsonDocument postgresChanges; + JsonDocument jsonRealtimeConfig; + String configJSON; + + // Heartbeat unsigned int last_ms = millis(); const char *jsonRealtimeHeartbeat = R"({"event": "heartbeat","topic": "phoenix","payload": {},"ref": "sentRef"})"; const char *tokenConfig = R"({"topic": "realtime:ESP","event": "access_token","payload": { "access_token": "" },"ref": "sendRef"})"; - void processMessage(uint8_t *payload, void (*func)(String)); - void webSocketEvent(WStype_t type, uint8_t *payload, size_t length, String configJSON, void (*func)(String)); + void processMessage(uint8_t *payload); + void webSocketEvent(WStype_t type, uint8_t *payload, size_t length); + + std::function handler; public: SupabaseRealtime() {} - void begin(String hostname, String key); - void listen(String table, String event, String filter, void (*func)(String)); + void begin(String hostname, String key, void (*func)(String)); + void addChangesListener(String table, String event, String schema, String filter); + void listen(); void loop(); int login_email(String email_a, String password_a); int login_phone(String phone_a, String password_a); diff --git a/src/Realtime.cpp b/src/Realtime.cpp index 03524ce..18e4b30 100644 --- a/src/Realtime.cpp +++ b/src/Realtime.cpp @@ -39,9 +39,14 @@ int SupabaseRealtime::_login_process() deserializeJson(doc, data); if (doc.containsKey("access_token") && !doc["access_token"].isNull() && doc["access_token"].is() && !doc["access_token"].as().isEmpty()) { - USER_TOKEN = doc["access_token"].as(); + String USER_TOKEN = doc["access_token"].as(); authTimeout = doc["expires_in"].as() * 1000; Serial.println("Login Success"); + + JsonDocument authConfig; + deserializeJson(authConfig, tokenConfig); + authConfig["payload"]["access_token"] = USER_TOKEN; + serializeJson(authConfig, configAUTH); } else { @@ -68,20 +73,26 @@ int SupabaseRealtime::_login_process() return httpCode; } -void SupabaseRealtime::listen(String table, String event, String filter, void (*func)(String)) +void SupabaseRealtime::addChangesListener(String table, String event, String schema, String filter) { - String configJSON; - JsonDocument jsonRealtimeConfig; - deserializeJson(jsonRealtimeConfig, config); - - jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["table"] = table; - jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["event"] = event; + JsonDocument tableObj; + + tableObj["event"] = event; + tableObj["schema"] = schema; + tableObj["table"] = table; if (filter != "") { - jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["filter"] = filter; + tableObj["filter"] = filter; } + postgresChanges.add(tableObj); +} + +void SupabaseRealtime::listen() +{ + deserializeJson(jsonRealtimeConfig, config); + jsonRealtimeConfig["payload"]["config"]["postgres_changes"] = postgresChanges; serializeJson(jsonRealtimeConfig, configJSON); String slug = "/realtime/v1/websocket?apikey=" + String(key) + "&vsn=1.0.0"; @@ -97,29 +108,23 @@ void SupabaseRealtime::listen(String table, String event, String filter, void (* slug.c_str()); // event handler - webSocket.onEvent(std::bind(&SupabaseRealtime::webSocketEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, configJSON, func)); + webSocket.onEvent(std::bind(&SupabaseRealtime::webSocketEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } -void SupabaseRealtime::processMessage(uint8_t *payload, void (*func)(String)) +void SupabaseRealtime::processMessage(uint8_t *payload) { JsonDocument result; deserializeJson(result, payload); String table = getEventTable(result); if (table != "null") { - String data = result["payload"]["data"]["record"]; - func(data); + String data = result["payload"]["data"]; + handler(data); }; } -void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t length, String configJSON, void (*func)(String)) +void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t length) { - String configAUTH; - JsonDocument authConfig; - deserializeJson(authConfig, tokenConfig); - authConfig["payload"]["access_token"] = USER_TOKEN; - serializeJson(authConfig, configAUTH); - switch (type) { case WStype_DISCONNECTED: @@ -131,7 +136,7 @@ void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t le webSocket.sendTXT(configAUTH); break; case WStype_TEXT: - processMessage(payload, func); + processMessage(payload); break; case WStype_BIN: Serial.printf("[WSc] get binary length: %u\n", length); @@ -139,6 +144,13 @@ void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t le case WStype_ERROR: Serial.printf("[WSc] Error: %s\n", payload); break; + case WStype_PING: + case WStype_PONG: + case WStype_FRAGMENT_TEXT_START: + case WStype_FRAGMENT_BIN_START: + case WStype_FRAGMENT: + case WStype_FRAGMENT_FIN: + break; } } @@ -158,22 +170,16 @@ void SupabaseRealtime::loop() { last_ms = millis(); webSocket.sendTXT(jsonRealtimeHeartbeat); - - String configJSON; - JsonDocument authConfig; - deserializeJson(authConfig, tokenConfig); - - authConfig["payload"]["access_token"] = USER_TOKEN; - serializeJson(authConfig, configJSON); - webSocket.sendTXT(configJSON); + webSocket.sendTXT(configAUTH); } } -void SupabaseRealtime::begin(String hostname, String key) +void SupabaseRealtime::begin(String hostname, String key, void (*func)(String)) { hostname.replace("https://", ""); this->hostname = hostname; this->key = key; + this->handler = func; } int SupabaseRealtime::login_email(String email_a, String password_a)