From db019764d9b341f9a6a58cd9a0dcb996752e7884 Mon Sep 17 00:00:00 2001 From: "Ruokun (Tommy) Niu" Date: Wed, 19 Feb 2025 09:30:24 -0800 Subject: [PATCH] Gremlin Reaction hotfix (#149) * updated gremlin reaction * added a new test case for gremlin * updated runner * nit * updated test config * PR comment fix * updated the logic for retrieving the parameters * nit --- .../gremlin-reaction-duplicate.yaml | 11 +++ .../gremlin-reaction.test.js | 18 ++++ .../resources.yaml | 16 ++++ e2e-tests/fixtures/signalr-fixture.js | 2 +- reactions/gremlin/gremlin-reaction/Dockerfile | 4 +- .../Services/GremlinService.cs | 88 +++++++++++++------ 6 files changed, 108 insertions(+), 31 deletions(-) create mode 100644 e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction-duplicate.yaml diff --git a/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction-duplicate.yaml b/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction-duplicate.yaml new file mode 100644 index 000000000..78a2ba998 --- /dev/null +++ b/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction-duplicate.yaml @@ -0,0 +1,11 @@ +kind: Reaction +apiVersion: v1 +name: e2e-gremlin-reaction-duplicate +spec: + kind: Gremlin + queries: + query3: + properties: + addedResultCommand: g.addV('Item').property('ItemName', @Name).property('Name',@Name) + gremlinHost: gremlin-server.default.svc.cluster.local + gremlinPort: 8182 \ No newline at end of file diff --git a/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction.test.js b/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction.test.js index afb50d82d..556faa4fa 100644 --- a/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction.test.js +++ b/e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction.test.js @@ -96,6 +96,23 @@ test('Test Gremlin Reaction - deletedResultCommand', async () => { }, 140000); +test('Test Gremlin Reaction - addedResultCommand with duplicate parameters', async () => { + const gremlinReaction = yaml.loadAll(fs.readFileSync(__dirname + '/gremlin-reaction-duplicate.yaml', 'utf8')); + await deployResources(gremlinReaction); + + await postgresClient.query(`INSERT INTO "Item" ("ItemId", "Name", "Category") VALUES (5, 'Drasi', '3')`); + await waitForCondition(async () => { + const result = await gremlinClient.V().has('ItemName', 'Drasi').hasNext(); + return result; + }, 1000,30000) + .then(() => { + expect(true).toBeTruthy(); + }) + .catch(() => { + expect(false).toBeTruthy(); + }); +}, 140000); + afterAll(async () => { await postgresClient.end(); postgresPortForward.stop(); @@ -118,6 +135,7 @@ afterAll(async () => { }); + function waitForCondition(checkFn, interval = 1000, timeout = 30000) { return new Promise((resolve, reject) => { let elapsedTime = 0; diff --git a/e2e-tests/03-gremlin-reaction-scenario/resources.yaml b/e2e-tests/03-gremlin-reaction-scenario/resources.yaml index 38d857913..1a3625b60 100644 --- a/e2e-tests/03-gremlin-reaction-scenario/resources.yaml +++ b/e2e-tests/03-gremlin-reaction-scenario/resources.yaml @@ -120,3 +120,19 @@ spec: i.ItemId AS Id, i.Name as Name, i.Category as Category +--- +apiVersion: v1 +kind: ContinuousQuery +name: query4 +spec: + mode: query + sources: + subscriptions: + - id: test-source-3 + query: > + MATCH + (i:Item {Category: '5'}) + RETURN + i.ItemId AS Id, + i.Name as Name, + i.Category as Category diff --git a/e2e-tests/fixtures/signalr-fixture.js b/e2e-tests/fixtures/signalr-fixture.js index f93f120b0..b9c9b47ce 100644 --- a/e2e-tests/fixtures/signalr-fixture.js +++ b/e2e-tests/fixtures/signalr-fixture.js @@ -43,7 +43,7 @@ class SignalrFixture { async start() { await deployResources([this.reactionManifest]); - await new Promise((r) => setTimeout(r, 5000)); + await new Promise((r) => setTimeout(r, 10000)); this.localPort = await this.portForward.start(); this.signalr = new signalR.HubConnectionBuilder() .withUrl(`http://127.0.0.1:${this.localPort}/hub`) diff --git a/reactions/gremlin/gremlin-reaction/Dockerfile b/reactions/gremlin/gremlin-reaction/Dockerfile index 477898872..3eda0395b 100644 --- a/reactions/gremlin/gremlin-reaction/Dockerfile +++ b/reactions/gremlin/gremlin-reaction/Dockerfile @@ -15,14 +15,14 @@ #See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. -FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/sdk:8.0-cbl-mariner2.0 AS build +FROM --platform=$TARGETPLATFORM mcr.microsoft.com/dotnet/sdk:8.0-cbl-mariner2.0 AS build ARG BUILD_CONFIGURATION=Release WORKDIR /src COPY . . RUN dotnet restore RUN dotnet build -c $BUILD_CONFIGURATION -o /app/build -FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/aspnet:8.0-cbl-mariner2.0 AS final +FROM --platform=$TARGETPLATFORM mcr.microsoft.com/dotnet/aspnet:8.0-cbl-mariner2.0 AS final WORKDIR /app COPY --from=build /app/build . ENTRYPOINT ["/app/gremlin-reaction"] diff --git a/reactions/gremlin/gremlin-reaction/Services/GremlinService.cs b/reactions/gremlin/gremlin-reaction/Services/GremlinService.cs index a513fb7c2..b42c0f616 100644 --- a/reactions/gremlin/gremlin-reaction/Services/GremlinService.cs +++ b/reactions/gremlin/gremlin-reaction/Services/GremlinService.cs @@ -86,7 +86,6 @@ public GremlinService(IConfiguration configuration, ILogger logg } - _addedResultCommand = configuration["addedResultCommand"]; _updatedResultCommand = configuration["updatedResultCommand"]; _deletedResultCommand = configuration["deletedResultCommand"]; @@ -107,7 +106,12 @@ public GremlinService(IConfiguration configuration, ILogger logg var param = match.Value.Substring(1); _logger.LogInformation($"Extracted AddedResultCommand Param: {param}"); - _addedResultCommandParamList.Add(param); + if (!_addedResultCommandParamList.Contains(param)) + { + _addedResultCommandParamList.Add(param); + } + // Prepare the parameterized query by removing the @ sign + _addedResultCommand = _addedResultCommand.Replace($"@{param}", param); } } @@ -119,7 +123,12 @@ public GremlinService(IConfiguration configuration, ILogger logg var param = match.Value.Substring(1); _logger.LogInformation($"Extracted UpdatedResultCommand Param: {param}"); - _updatedResultCommandParamList.Add(param); + if (!_updatedResultCommandParamList.Contains(param)) + { + _updatedResultCommandParamList.Add(param); + } + // Prepare the parameterized query by removing the @ sign + _updatedResultCommand = _updatedResultCommand.Replace($"@{param}", param); } } @@ -131,14 +140,23 @@ public GremlinService(IConfiguration configuration, ILogger logg var param = match.Value.Substring(1); _logger.LogInformation($"Extracted DeletedResultCommand Param: {param}"); - _deletedResultCommandParamList.Add(param); + if (!_deletedResultCommandParamList.Contains(param)) + { + _deletedResultCommandParamList.Add(param); + } + // Prepare the parameterized query by removing the @ sign + _deletedResultCommand = _deletedResultCommand.Replace($"@{param}", param); } } } public void ProcessAddedQueryResults(Dictionary res) { - string newCmd = _addedResultCommand; + if (_addedResultCommand == null) + { + _logger.LogInformation("No Added Result Command Specified"); + return; + } // Dictionary to hold the parameters for the command Dictionary addedResultCommandParams = new Dictionary(); @@ -146,14 +164,21 @@ public void ProcessAddedQueryResults(Dictionary res) foreach (string param in _addedResultCommandParamList) { - // Prepare the parameterized query by removing the @ sign - newCmd = newCmd.Replace($"@{param}", param); + // Retrieve the value from the query result var queryResultValue = ExtractQueryResultValue(param, res); + // Add the parameter to the dictionary addedResultCommandParams.Add(param, queryResultValue); } - _logger.LogInformation($"Issuing added result command: {newCmd}"); - var resultSet = SubmitRequest(_gremlinClient, newCmd, addedResultCommandParams).Result; + if (_addedResultCommandParamList.Count != addedResultCommandParams.Count) + { + _logger.LogInformation($"Parameter count mismatch: Expected {_addedResultCommandParamList.Count}, got {addedResultCommandParams.Count}"); + _logger.LogInformation($"Skipping command execution for {_addedResultCommand}"); + + } + _logger.LogInformation($"Issuing added result command: {_addedResultCommand}"); + + var resultSet = SubmitRequest(_gremlinClient, _addedResultCommand, addedResultCommandParams).Result; if (resultSet.Count > 0) { _logger.LogInformation("Added Command Result:"); @@ -168,9 +193,11 @@ public void ProcessAddedQueryResults(Dictionary res) public void ProcessUpdatedQueryResults(UpdatedResultElement updatedResult) { - _logger.LogInformation($"Updated Result {updatedResult}"); - - string newCmd = _updatedResultCommand; + if (_updatedResultCommand == null) + { + _logger.LogInformation("No Updated Result Command Specified"); + return; + } Dictionary updatedResultCommandParams = new Dictionary(); @@ -178,26 +205,26 @@ public void ProcessUpdatedQueryResults(UpdatedResultElement updatedResult) { if (param.StartsWith("before.")) { - // Prepare the parameterized query by removing the @ sign - newCmd = newCmd.Replace($"@{param}", param); updatedResultCommandParams.Add(param, ExtractQueryResultValue(param.Substring(7), updatedResult.Before)); } else if (param.StartsWith("after.")) { - // Prepare the parameterized query by removing the @ sign - newCmd = newCmd.Replace($"@{param}", param); updatedResultCommandParams.Add(param, ExtractQueryResultValue(param.Substring(6), updatedResult.After)); } else { - // Prepare the parameterized query by removing the @ sign - newCmd = newCmd.Replace($"@{param}", param); updatedResultCommandParams.Add(param, ExtractQueryResultValue(param, updatedResult.After)); } } - _logger.LogInformation($"Issuing updated result command: {newCmd}"); + if (_updatedResultCommandParamList.Count != updatedResultCommandParams.Count) + { + _logger.LogInformation($"Parameter count mismatch: Expected {_updatedResultCommandParamList.Count}, got {updatedResultCommandParams.Count}"); + _logger.LogInformation($"Skipping command execution for {_updatedResultCommand}"); - var resultSet = SubmitRequest(_gremlinClient, newCmd, updatedResultCommandParams).Result; + } + _logger.LogInformation($"Issuing updated result command: {_updatedResultCommand}"); + + var resultSet = SubmitRequest(_gremlinClient, _updatedResultCommand, updatedResultCommandParams).Result; if (resultSet.Count > 0) { _logger.LogInformation("Updated Command Result:"); @@ -212,21 +239,26 @@ public void ProcessUpdatedQueryResults(UpdatedResultElement updatedResult) public void ProcessDeletedQueryResults(Dictionary deletedResults) { - _logger.LogInformation($"Deleted Result {deletedResults}"); - - string newCmd = _deletedResultCommand; + if (_deletedResultCommand == null) + { + _logger.LogInformation("No Deleted Result Command Specified"); + return; + } Dictionary deletedResultCommandParams = new Dictionary(); - foreach (string param in _deletedResultCommandParamList) { - // Prepare the parameterized query by removing the @ sign - newCmd = newCmd.Replace($"@{param}", param); deletedResultCommandParams.Add(param, ExtractQueryResultValue(param, deletedResults)); } - _logger.LogInformation($"Issuing deleted result command: {newCmd}"); + if (_deletedResultCommandParamList.Count != deletedResultCommandParams.Count) + { + _logger.LogInformation($"Parameter count mismatch: Expected {_deletedResultCommandParamList.Count}, got {deletedResultCommandParams.Count}"); + _logger.LogInformation($"Skipping command execution for {_deletedResultCommand}"); + + } + _logger.LogInformation($"Issuing deleted result command: {_deletedResultCommand}"); - var resultSet = SubmitRequest(_gremlinClient, newCmd, deletedResultCommandParams).Result; + var resultSet = SubmitRequest(_gremlinClient, _deletedResultCommand, deletedResultCommandParams).Result; if (resultSet.Count > 0) { _logger.LogInformation("Deleted Command Result:");