Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix open data dk data target #251

Merged
merged 1 commit into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/modules/open-data-dk-sharing.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import { OpenDataDkSharingService } from "@services/data-management/open-data-dk
import { SharedModule } from "@modules/shared.module";
import { OrganizationModule } from "@modules/user-management/organization.module";
import { PayloadDecoderExecutorModuleModule } from "@modules/payload-decoder-executor-module.module";
import { ChirpstackAdministrationModule } from "@modules/device-integrations/chirpstack-administration.module";

@Module({
imports: [SharedModule, OrganizationModule, PayloadDecoderExecutorModuleModule],
imports: [SharedModule, OrganizationModule, PayloadDecoderExecutorModuleModule, ChirpstackAdministrationModule],
controllers: [OpenDataDkSharingController],
providers: [OpenDataDkSharingService],
})
Expand Down
26 changes: 18 additions & 8 deletions src/services/data-management/open-data-dk-sharing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import { IoTDevice } from "@entities/iot-device.entity";
import configuration from "@config/configuration";
import { OpenDataDkSharingController } from "@admin-controller/open-data-dk-sharing.controller";
import { ErrorCodes } from "@enum/error-codes.enum";
import { IoTDeviceType } from "@enum/device-type.enum";
import { ChirpstackDeviceService } from "@services/chirpstack/chirpstack-device.service";

@Injectable()
export class OpenDataDkSharingService {
constructor(
@InjectRepository(OpenDataDkDataset)
private repository: Repository<OpenDataDkDataset>,
private payloadDecoderExecutorService: PayloadDecoderExecutorService
private payloadDecoderExecutorService: PayloadDecoderExecutorService,
private chirpstackDeviceService: ChirpstackDeviceService
) {}

private readonly BACKEND_BASE_URL = configuration()["backend"]["baseurl"];
Expand All @@ -40,18 +43,17 @@ export class OpenDataDkSharingService {
return { error: ErrorCodes.NoData };
}

return this.decodeData(rawData);
return await this.decodeData(rawData);
}

private decodeData(rawData: OpenDataDkDataset) {
// TODO: Do this in parallel
private async decodeData(rawData: OpenDataDkDataset) {
const results: any[] = [];
rawData.dataTarget.connections.forEach(connection => {
for (const connection of rawData.dataTarget.connections) {
this.logger.debug(`Got connection(${connection.id})`);
connection.iotDevices.forEach(async device => {
for (const device of connection.iotDevices) {
await this.decodeDevice(device, connection, results);
});
});
}
}
return results;
}

Expand All @@ -67,6 +69,14 @@ export class OpenDataDkSharingService {
}
try {
if (connection.payloadDecoder != null) {
// Enrich lorawan devices with chirpstack data
if (
device.type === IoTDeviceType.LoRaWAN &&
connection.payloadDecoder.decodingFunction.includes("lorawanSettings")
) {
device = await this.chirpstackDeviceService.enrichLoRaWANDevice(device);
}

const decoded = await this.payloadDecoderExecutorService.callUntrustedCode(
connection.payloadDecoder.decodingFunction,
device,
Expand Down
25 changes: 8 additions & 17 deletions src/services/data-targets/data-target-kafka-listener.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,16 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer {
try {
iotDevice = await this.ioTDeviceService.findOne(dto.iotDeviceId);
} catch (err) {
this.logger.error(
`Error finding IoTDevice by id: ${dto.iotDeviceId}. Stopping.`
);
this.logger.error(`Error finding IoTDevice by id: ${dto.iotDeviceId}. Stopping.`);
return;
}

this.logger.debug(
`Sending payload from deviceId: ${iotDevice.id}; Name: '${iotDevice.name}'`
);
this.logger.debug(`Sending payload from deviceId: ${iotDevice.id}; Name: '${iotDevice.name}'`);

await this.findDataTargetsAndSend(iotDevice, dto);
}

private async findDataTargetsAndSend(
iotDevice: IoTDevice,
dto: TransformedPayloadDto
) {
private async findDataTargetsAndSend(iotDevice: IoTDevice, dto: TransformedPayloadDto) {
// Get connections in order to only send to the dataTargets which is identified by the pair of IoTDevice and PayloadDecoder
const dataTargets = await this.dataTargetService.findDataTargetsByConnectionPayloadDecoderAndIoTDevice(
iotDevice.id,
Expand All @@ -79,13 +72,9 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer {
if (target.type == DataTargetType.HttpPush) {
try {
const status = await this.httpPushDataTargetService.send(target, dto);
this.logger.debug(
`Sent to HttpPush target: ${JSON.stringify(status)}`
);
this.logger.debug(`Sent to HttpPush target: ${JSON.stringify(status)}`);
} catch (err) {
this.logger.error(
`Error while sending to Http Push DataTarget: ${err}`
);
this.logger.error(`Error while sending to Http Push DataTarget: ${err}`);
}
} else if (target.type == DataTargetType.Fiware) {
try {
Expand All @@ -100,8 +89,10 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer {
} catch (err) {
this.logger.error(`Error while sending to MQTT DataTarget: ${err}`);
}
} else if (target.type === DataTargetType.OpenDataDK) {
// OpenDataDk data targets are handled uniquely and ignored here.
} else {
throw new NotImplementedException(`Not implemented for: ${target.type}`);
this.logger.error(`Not implemented for: ${target.type}`);
}
});
}
Expand Down
Loading