Skip to content

Commit

Permalink
Multiple table support
Browse files Browse the repository at this point in the history
  • Loading branch information
jhagas committed Jul 19, 2024
1 parent 1d5a099 commit e2d69a0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 43 deletions.
24 changes: 17 additions & 7 deletions examples/realtime-postgresChanges/realtime-postgresChanges.ino
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@

SupabaseRealtime realtime;

void DevicesTableHandler(String result)
void HandleChanges(String result)
{
JsonDocument doc;
deserializeJson(doc, result);

// Example of what you can do with the result
String state = doc["state"];
Serial.println(state);
String tableName = doc["table"];
String event = doc["type"];
String changes = doc["record"];

Serial.print(tableName);
Serial.print(" : ");
Serial.println(event);
Serial.println(changes);
}

void setup()
Expand All @@ -32,16 +38,20 @@ void setup()
}
Serial.println("\nConnected to WiFi");

realtime.begin("https://project.supabase.co", "apikey");
realtime.begin("https://project.supabase.co", "apikey", HandleChanges);
realtime.login_email("email", "password"); // Only if you activate RLS in your Supabase Postgres Table

// Parameter 1 : Table name
// Parameter 2 : Event type ("*" | "INSERT" | "UPDATE" | "DELETE")
// Parameter 3 : Filter
// Parameter 3 : Your Supabase Table Postgres Schema
// Parameter 4 : Filter
// Please read : https://supabase.com/docs/guides/realtime/postgres-changes?queryGroups=language&language=js#available-filters
// empty string if you don't want to filter the result
// Parameter 4 : Callback function, how to handle the result (message)
realtime.listen("table", "*", "", DevicesTableHandler);
realtime.addChangesListener("table1", "INSERT", "public", "id=eq.0");
// You can add multiple table listeners
realtime.addChangesListener("table2", "*", "public", "");

realtime.listen();
}

void loop()
Expand Down
21 changes: 15 additions & 6 deletions src/ESP-Supabase-Realtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,35 @@ class SupabaseRealtime
String password;
String data;
String loginMethod;
String USER_TOKEN = "";
bool useAuth;
int _login_process();
unsigned int authTimeout = 0;
unsigned long loginTime;
String configAUTH;

const char *config = "{\"event\":\"phx_join\",\"topic\":\"realtime:ESP\",\"payload\":{\"config\":{\"postgres_changes\":[{\"event\":\"abcdfeghijklmbopq\",\"schema\":\"public\",\"table\":\"abcdfeghijklmbopq\"}]}},\"ref\":\"sentRef\"}";
// Initial config
const char *config = "{\"event\":\"phx_join\",\"topic\":\"realtime:ESP\",\"payload\":{\"config\":{\"postgres_changes\":[]}},\"ref\":\"sentRef\"}";
JsonDocument postgresChanges;
JsonDocument jsonRealtimeConfig;
String configJSON;

// Heartbeat
unsigned int last_ms = millis();
const char *jsonRealtimeHeartbeat = R"({"event": "heartbeat","topic": "phoenix","payload": {},"ref": "sentRef"})";
const char *tokenConfig = R"({"topic": "realtime:ESP","event": "access_token","payload": {
"access_token": ""
},"ref": "sendRef"})";

void processMessage(uint8_t *payload, void (*func)(String));
void webSocketEvent(WStype_t type, uint8_t *payload, size_t length, String configJSON, void (*func)(String));
void processMessage(uint8_t *payload);
void webSocketEvent(WStype_t type, uint8_t *payload, size_t length);

std::function<void(String)> handler;

public:
SupabaseRealtime() {}
void begin(String hostname, String key);
void listen(String table, String event, String filter, void (*func)(String));
void begin(String hostname, String key, void (*func)(String));
void addChangesListener(String table, String event, String schema, String filter);
void listen();
void loop();
int login_email(String email_a, String password_a);
int login_phone(String phone_a, String password_a);
Expand Down
66 changes: 36 additions & 30 deletions src/Realtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ int SupabaseRealtime::_login_process()
deserializeJson(doc, data);
if (doc.containsKey("access_token") && !doc["access_token"].isNull() && doc["access_token"].is<String>() && !doc["access_token"].as<String>().isEmpty())
{
USER_TOKEN = doc["access_token"].as<String>();
String USER_TOKEN = doc["access_token"].as<String>();
authTimeout = doc["expires_in"].as<int>() * 1000;
Serial.println("Login Success");

JsonDocument authConfig;
deserializeJson(authConfig, tokenConfig);
authConfig["payload"]["access_token"] = USER_TOKEN;
serializeJson(authConfig, configAUTH);
}
else
{
Expand All @@ -68,20 +73,26 @@ int SupabaseRealtime::_login_process()
return httpCode;
}

void SupabaseRealtime::listen(String table, String event, String filter, void (*func)(String))
void SupabaseRealtime::addChangesListener(String table, String event, String schema, String filter)
{
String configJSON;
JsonDocument jsonRealtimeConfig;
deserializeJson(jsonRealtimeConfig, config);

jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["table"] = table;
jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["event"] = event;
JsonDocument tableObj;

tableObj["event"] = event;
tableObj["schema"] = schema;
tableObj["table"] = table;

if (filter != "")
{
jsonRealtimeConfig["payload"]["config"]["postgres_changes"][0]["filter"] = filter;
tableObj["filter"] = filter;
}

postgresChanges.add(tableObj);
}

void SupabaseRealtime::listen()
{
deserializeJson(jsonRealtimeConfig, config);
jsonRealtimeConfig["payload"]["config"]["postgres_changes"] = postgresChanges;
serializeJson(jsonRealtimeConfig, configJSON);

String slug = "/realtime/v1/websocket?apikey=" + String(key) + "&vsn=1.0.0";
Expand All @@ -97,29 +108,23 @@ void SupabaseRealtime::listen(String table, String event, String filter, void (*
slug.c_str());

// event handler
webSocket.onEvent(std::bind(&SupabaseRealtime::webSocketEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, configJSON, func));
webSocket.onEvent(std::bind(&SupabaseRealtime::webSocketEvent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}

void SupabaseRealtime::processMessage(uint8_t *payload, void (*func)(String))
void SupabaseRealtime::processMessage(uint8_t *payload)
{
JsonDocument result;
deserializeJson(result, payload);
String table = getEventTable(result);
if (table != "null")
{
String data = result["payload"]["data"]["record"];
func(data);
String data = result["payload"]["data"];
handler(data);
};
}

void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t length, String configJSON, void (*func)(String))
void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t length)
{
String configAUTH;
JsonDocument authConfig;
deserializeJson(authConfig, tokenConfig);
authConfig["payload"]["access_token"] = USER_TOKEN;
serializeJson(authConfig, configAUTH);

switch (type)
{
case WStype_DISCONNECTED:
Expand All @@ -131,14 +136,21 @@ void SupabaseRealtime::webSocketEvent(WStype_t type, uint8_t *payload, size_t le
webSocket.sendTXT(configAUTH);
break;
case WStype_TEXT:
processMessage(payload, func);
processMessage(payload);
break;
case WStype_BIN:
Serial.printf("[WSc] get binary length: %u\n", length);
break;
case WStype_ERROR:
Serial.printf("[WSc] Error: %s\n", payload);
break;
case WStype_PING:
case WStype_PONG:
case WStype_FRAGMENT_TEXT_START:
case WStype_FRAGMENT_BIN_START:
case WStype_FRAGMENT:
case WStype_FRAGMENT_FIN:
break;
}
}

Expand All @@ -158,22 +170,16 @@ void SupabaseRealtime::loop()
{
last_ms = millis();
webSocket.sendTXT(jsonRealtimeHeartbeat);

String configJSON;
JsonDocument authConfig;
deserializeJson(authConfig, tokenConfig);

authConfig["payload"]["access_token"] = USER_TOKEN;
serializeJson(authConfig, configJSON);
webSocket.sendTXT(configJSON);
webSocket.sendTXT(configAUTH);
}
}

void SupabaseRealtime::begin(String hostname, String key)
void SupabaseRealtime::begin(String hostname, String key, void (*func)(String))
{
hostname.replace("https://", "");
this->hostname = hostname;
this->key = key;
this->handler = func;
}

int SupabaseRealtime::login_email(String email_a, String password_a)
Expand Down

0 comments on commit e2d69a0

Please sign in to comment.