You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Description:
I was doing experiments with the distributed deployment of the stream processor by enabling both the Periodic and Incremental File-based checkpoints. When I was doing the experiments I saw that the checkpoint size keep on increasing with time. ( I used the checkpoint interval as 1 min and no of revisions as 3 )
I suspect that the old checkpoints don't get deleted
Affected Product Version:
Distributed Deployment of the wso2sp-4.3.0
Additional Information:
I used the following SIddhi Application
@name('query 1') @dist(parallel='9', execGroup='group3')
partition with (groupID of inputEmailsStream)
begin
from inputEmailsStream
select iij_timestamp, groupID, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body, regexstr, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp, str:length(body) as body_size
insert into #inputEmailsStream1;
from #filteredEmailStream1
select iij_timestamp, groupID, fromAddress, emailProcessorBenchmark:filter(toAddresses) as toAdds, emailProcessorBenchmark:filter(ccAddresses) as ccAdds, emailProcessorBenchmark:filter(bccAddresses) as bccAdds, subject, body, injected_iijtimestamp, body_size insert into #filteredEmailStream2;
from #filteredEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, emailProcessorBenchmark:modify(body) as bodyObfuscated1, injected_iijtimestamp, body_size insert into #modifiedEmailStream1;
from #modifiedEmailStream1 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated1, 'Kenneth Lay', 'Person1') as bodyObfuscated2, injected_iijtimestamp, body_size insert into #modifiedEmailStream2;
from #modifiedEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated2, 'Jeffrey Skilling', 'Person2') as bodyObfuscated3, injected_iijtimestamp, body_size insert into #modifiedEmailStream3;
from #modifiedEmailStream3 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated3, 'Andrew Fastow', 'Person3') as bodyObfuscated4, injected_iijtimestamp, body_size insert into #modifiedEmailStream4;
from #modifiedEmailStream4 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, emailProcessorBenchmark:mostFrequentWord(bodyObfuscated4, subject) as updatedSubject, bodyObfuscated4 as bodyObfuscated, injected_iijtimestamp, body_size insert into #modifiedEmailStream5;
Description:
I was doing experiments with the distributed deployment of the stream processor by enabling both the Periodic and Incremental File-based checkpoints. When I was doing the experiments I saw that the checkpoint size keep on increasing with time. ( I used the checkpoint interval as 1 min and no of revisions as 3 )
I suspect that the old checkpoints don't get deleted
Affected Product Version:
Distributed Deployment of the wso2sp-4.3.0
Additional Information:
I used the following SIddhi Application
@app:name("EmailProcessor-checkpointing")
@app:description("Email Processor benchmark for WSO2 Stream Processor 4.x.x")
@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='localhost:9092', topic.list = 'test15',
@Map(type = 'json'))
define stream inputEmailsStream (iij_timestamp long, groupID int, fromAddress string, toAddresses string, ccAddresses string, bccAddresses string, subject string, body string, regexstr string);
@export('outputEmailStream:1.0.0')
define stream outputEmailStream (iij_timestamp long, fromAddress string, toAdds string, ccAdds string, bccAdds string, updatedSubject string, bodyObfuscated string);
@sink(type='log')
define stream filter1(subject string);
@sink(type='log')
define stream AlertStream(iij_timestamp long, fromAddress string);
@name('query 1')
@dist(parallel='9', execGroup='group3')
partition with (groupID of inputEmailsStream)
begin
from inputEmailsStream
select iij_timestamp, groupID, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body, regexstr, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp, str:length(body) as body_size
insert into #inputEmailsStream1;
from #inputEmailsStream1 [ regex:find(regexstr, fromAddress) == true ] select iij_timestamp, groupID, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body, injected_iijtimestamp, body_size insert into #filteredEmailStream1;
from #filteredEmailStream1
select iij_timestamp, groupID, fromAddress, emailProcessorBenchmark:filter(toAddresses) as toAdds, emailProcessorBenchmark:filter(ccAddresses) as ccAdds, emailProcessorBenchmark:filter(bccAddresses) as bccAdds, subject, body, injected_iijtimestamp, body_size insert into #filteredEmailStream2;
from #filteredEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, emailProcessorBenchmark:modify(body) as bodyObfuscated1, injected_iijtimestamp, body_size insert into #modifiedEmailStream1;
from #modifiedEmailStream1 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated1, 'Kenneth Lay', 'Person1') as bodyObfuscated2, injected_iijtimestamp, body_size insert into #modifiedEmailStream2;
from #modifiedEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated2, 'Jeffrey Skilling', 'Person2') as bodyObfuscated3, injected_iijtimestamp, body_size insert into #modifiedEmailStream3;
from #modifiedEmailStream3 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated3, 'Andrew Fastow', 'Person3') as bodyObfuscated4, injected_iijtimestamp, body_size insert into #modifiedEmailStream4;
from #modifiedEmailStream4 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, emailProcessorBenchmark:mostFrequentWord(bodyObfuscated4, subject) as updatedSubject, bodyObfuscated4 as bodyObfuscated, injected_iijtimestamp, body_size insert into #modifiedEmailStream5;
from #modifiedEmailStream5#throughput:throughput(injected_iijtimestamp,"throughput", 3 , 9 , "filter", 5)
select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated, injected_iijtimestamp, body_size
insert into #modifiedEmailStream;
-- metrics
from #modifiedEmailStream select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated, emailProcessorBenchmark:metrics(bodyObfuscated) as metrics, body_size insert into #metricsEmailStream;
from #metricsEmailStream#window.length(1000) select iij_timestamp, fromAddress insert into AlertStream;
end;
The text was updated successfully, but these errors were encountered: