Skip to content

Commit

Permalink
Gremlin Reaction hotfix (#149)
Browse files Browse the repository at this point in the history
* updated gremlin reaction

* added a new test case for gremlin

* updated runner

* nit

* updated test config

* PR comment fix

* updated the logic for retrieving the parameters

* nit
  • Loading branch information
ruokun-niu authored Feb 19, 2025
1 parent 0c166c4 commit db01976
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
kind: Reaction
apiVersion: v1
name: e2e-gremlin-reaction-duplicate
spec:
kind: Gremlin
queries:
query3:
properties:
addedResultCommand: g.addV('Item').property('ItemName', @Name).property('Name',@Name)
gremlinHost: gremlin-server.default.svc.cluster.local
gremlinPort: 8182
18 changes: 18 additions & 0 deletions e2e-tests/03-gremlin-reaction-scenario/gremlin-reaction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ test('Test Gremlin Reaction - deletedResultCommand', async () => {
}, 140000);


test('Test Gremlin Reaction - addedResultCommand with duplicate parameters', async () => {
const gremlinReaction = yaml.loadAll(fs.readFileSync(__dirname + '/gremlin-reaction-duplicate.yaml', 'utf8'));
await deployResources(gremlinReaction);

await postgresClient.query(`INSERT INTO "Item" ("ItemId", "Name", "Category") VALUES (5, 'Drasi', '3')`);
await waitForCondition(async () => {
const result = await gremlinClient.V().has('ItemName', 'Drasi').hasNext();
return result;
}, 1000,30000)
.then(() => {
expect(true).toBeTruthy();
})
.catch(() => {
expect(false).toBeTruthy();
});
}, 140000);

afterAll(async () => {
await postgresClient.end();
postgresPortForward.stop();
Expand All @@ -118,6 +135,7 @@ afterAll(async () => {
});



function waitForCondition(checkFn, interval = 1000, timeout = 30000) {
return new Promise((resolve, reject) => {
let elapsedTime = 0;
Expand Down
16 changes: 16 additions & 0 deletions e2e-tests/03-gremlin-reaction-scenario/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,19 @@ spec:
i.ItemId AS Id,
i.Name as Name,
i.Category as Category
---
apiVersion: v1
kind: ContinuousQuery
name: query4
spec:
mode: query
sources:
subscriptions:
- id: test-source-3
query: >
MATCH
(i:Item {Category: '5'})
RETURN
i.ItemId AS Id,
i.Name as Name,
i.Category as Category
2 changes: 1 addition & 1 deletion e2e-tests/fixtures/signalr-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SignalrFixture {

async start() {
await deployResources([this.reactionManifest]);
await new Promise((r) => setTimeout(r, 5000));
await new Promise((r) => setTimeout(r, 10000));
this.localPort = await this.portForward.start();
this.signalr = new signalR.HubConnectionBuilder()
.withUrl(`http://127.0.0.1:${this.localPort}/hub`)
Expand Down
4 changes: 2 additions & 2 deletions reactions/gremlin/gremlin-reaction/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging.

FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/sdk:8.0-cbl-mariner2.0 AS build
FROM --platform=$TARGETPLATFORM mcr.microsoft.com/dotnet/sdk:8.0-cbl-mariner2.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY . .
RUN dotnet restore
RUN dotnet build -c $BUILD_CONFIGURATION -o /app/build

FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/aspnet:8.0-cbl-mariner2.0 AS final
FROM --platform=$TARGETPLATFORM mcr.microsoft.com/dotnet/aspnet:8.0-cbl-mariner2.0 AS final
WORKDIR /app
COPY --from=build /app/build .
ENTRYPOINT ["/app/gremlin-reaction"]
88 changes: 60 additions & 28 deletions reactions/gremlin/gremlin-reaction/Services/GremlinService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public GremlinService(IConfiguration configuration, ILogger<GremlinService> logg
}



_addedResultCommand = configuration["addedResultCommand"];
_updatedResultCommand = configuration["updatedResultCommand"];
_deletedResultCommand = configuration["deletedResultCommand"];
Expand All @@ -107,7 +106,12 @@ public GremlinService(IConfiguration configuration, ILogger<GremlinService> logg
var param = match.Value.Substring(1);

_logger.LogInformation($"Extracted AddedResultCommand Param: {param}");
_addedResultCommandParamList.Add(param);
if (!_addedResultCommandParamList.Contains(param))
{
_addedResultCommandParamList.Add(param);
}
// Prepare the parameterized query by removing the @ sign
_addedResultCommand = _addedResultCommand.Replace($"@{param}", param);
}
}

Expand All @@ -119,7 +123,12 @@ public GremlinService(IConfiguration configuration, ILogger<GremlinService> logg
var param = match.Value.Substring(1);

_logger.LogInformation($"Extracted UpdatedResultCommand Param: {param}");
_updatedResultCommandParamList.Add(param);
if (!_updatedResultCommandParamList.Contains(param))
{
_updatedResultCommandParamList.Add(param);
}
// Prepare the parameterized query by removing the @ sign
_updatedResultCommand = _updatedResultCommand.Replace($"@{param}", param);
}
}

Expand All @@ -131,29 +140,45 @@ public GremlinService(IConfiguration configuration, ILogger<GremlinService> logg
var param = match.Value.Substring(1);

_logger.LogInformation($"Extracted DeletedResultCommand Param: {param}");
_deletedResultCommandParamList.Add(param);
if (!_deletedResultCommandParamList.Contains(param))
{
_deletedResultCommandParamList.Add(param);
}
// Prepare the parameterized query by removing the @ sign
_deletedResultCommand = _deletedResultCommand.Replace($"@{param}", param);
}
}
}

public void ProcessAddedQueryResults(Dictionary<string, object> res)
{
string newCmd = _addedResultCommand;
if (_addedResultCommand == null)
{
_logger.LogInformation("No Added Result Command Specified");
return;
}

// Dictionary to hold the parameters for the command
Dictionary<string, object> addedResultCommandParams = new Dictionary<string, object>();


foreach (string param in _addedResultCommandParamList)
{
// Prepare the parameterized query by removing the @ sign
newCmd = newCmd.Replace($"@{param}", param);
// Retrieve the value from the query result
var queryResultValue = ExtractQueryResultValue(param, res);
// Add the parameter to the dictionary
addedResultCommandParams.Add(param, queryResultValue);
}
_logger.LogInformation($"Issuing added result command: {newCmd}");

var resultSet = SubmitRequest(_gremlinClient, newCmd, addedResultCommandParams).Result;
if (_addedResultCommandParamList.Count != addedResultCommandParams.Count)
{
_logger.LogInformation($"Parameter count mismatch: Expected {_addedResultCommandParamList.Count}, got {addedResultCommandParams.Count}");
_logger.LogInformation($"Skipping command execution for {_addedResultCommand}");

}
_logger.LogInformation($"Issuing added result command: {_addedResultCommand}");

var resultSet = SubmitRequest(_gremlinClient, _addedResultCommand, addedResultCommandParams).Result;
if (resultSet.Count > 0)
{
_logger.LogInformation("Added Command Result:");
Expand All @@ -168,36 +193,38 @@ public void ProcessAddedQueryResults(Dictionary<string, object> res)

public void ProcessUpdatedQueryResults(UpdatedResultElement updatedResult)
{
_logger.LogInformation($"Updated Result {updatedResult}");

string newCmd = _updatedResultCommand;
if (_updatedResultCommand == null)
{
_logger.LogInformation("No Updated Result Command Specified");
return;
}

Dictionary<string, object> updatedResultCommandParams = new Dictionary<string, object>();

foreach (string param in _updatedResultCommandParamList)
{
if (param.StartsWith("before."))
{
// Prepare the parameterized query by removing the @ sign
newCmd = newCmd.Replace($"@{param}", param);
updatedResultCommandParams.Add(param, ExtractQueryResultValue(param.Substring(7), updatedResult.Before));
}
else if (param.StartsWith("after."))
{
// Prepare the parameterized query by removing the @ sign
newCmd = newCmd.Replace($"@{param}", param);
updatedResultCommandParams.Add(param, ExtractQueryResultValue(param.Substring(6), updatedResult.After));
}
else
{
// Prepare the parameterized query by removing the @ sign
newCmd = newCmd.Replace($"@{param}", param);
updatedResultCommandParams.Add(param, ExtractQueryResultValue(param, updatedResult.After));
}
}
_logger.LogInformation($"Issuing updated result command: {newCmd}");
if (_updatedResultCommandParamList.Count != updatedResultCommandParams.Count)
{
_logger.LogInformation($"Parameter count mismatch: Expected {_updatedResultCommandParamList.Count}, got {updatedResultCommandParams.Count}");
_logger.LogInformation($"Skipping command execution for {_updatedResultCommand}");

var resultSet = SubmitRequest(_gremlinClient, newCmd, updatedResultCommandParams).Result;
}
_logger.LogInformation($"Issuing updated result command: {_updatedResultCommand}");

var resultSet = SubmitRequest(_gremlinClient, _updatedResultCommand, updatedResultCommandParams).Result;
if (resultSet.Count > 0)
{
_logger.LogInformation("Updated Command Result:");
Expand All @@ -212,21 +239,26 @@ public void ProcessUpdatedQueryResults(UpdatedResultElement updatedResult)

public void ProcessDeletedQueryResults(Dictionary<string, object> deletedResults)
{
_logger.LogInformation($"Deleted Result {deletedResults}");

string newCmd = _deletedResultCommand;
if (_deletedResultCommand == null)
{
_logger.LogInformation("No Deleted Result Command Specified");
return;
}
Dictionary<string, object> deletedResultCommandParams = new Dictionary<string, object>();


foreach (string param in _deletedResultCommandParamList)
{
// Prepare the parameterized query by removing the @ sign
newCmd = newCmd.Replace($"@{param}", param);
deletedResultCommandParams.Add(param, ExtractQueryResultValue(param, deletedResults));
}
_logger.LogInformation($"Issuing deleted result command: {newCmd}");
if (_deletedResultCommandParamList.Count != deletedResultCommandParams.Count)
{
_logger.LogInformation($"Parameter count mismatch: Expected {_deletedResultCommandParamList.Count}, got {deletedResultCommandParams.Count}");
_logger.LogInformation($"Skipping command execution for {_deletedResultCommand}");

}
_logger.LogInformation($"Issuing deleted result command: {_deletedResultCommand}");

var resultSet = SubmitRequest(_gremlinClient, newCmd, deletedResultCommandParams).Result;
var resultSet = SubmitRequest(_gremlinClient, _deletedResultCommand, deletedResultCommandParams).Result;
if (resultSet.Count > 0)
{
_logger.LogInformation("Deleted Command Result:");
Expand Down

0 comments on commit db01976

Please sign in to comment.