Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint size keep on increasing with time #1048

Open
WinmaH opened this issue Oct 26, 2019 · 2 comments
Open

Checkpoint size keep on increasing with time #1048

WinmaH opened this issue Oct 26, 2019 · 2 comments

Comments

@WinmaH
Copy link

WinmaH commented Oct 26, 2019

Description:
I was doing experiments with the distributed deployment of the stream processor by enabling both the Periodic and Incremental File-based checkpoints. When I was doing the experiments I saw that the checkpoint size keep on increasing with time. ( I used the checkpoint interval as 1 min and no of revisions as 3 )
I suspect that the old checkpoints don't get deleted

Affected Product Version:
Distributed Deployment of the wso2sp-4.3.0

Additional Information:
I used the following SIddhi Application

@app:name("EmailProcessor-checkpointing")
@app:description("Email Processor benchmark for WSO2 Stream Processor 4.x.x")
@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='localhost:9092', topic.list = 'test15',
@Map(type = 'json'))

define stream inputEmailsStream (iij_timestamp long, groupID int, fromAddress string, toAddresses string, ccAddresses string, bccAddresses string, subject string, body string, regexstr string);
@export('outputEmailStream:1.0.0')
define stream outputEmailStream (iij_timestamp long, fromAddress string, toAdds string, ccAdds string, bccAdds string, updatedSubject string, bodyObfuscated string);
@sink(type='log')
define stream filter1(subject string);
@sink(type='log')
define stream AlertStream(iij_timestamp long, fromAddress string);

@name('query 1')
@dist(parallel='9', execGroup='group3')
partition with (groupID of inputEmailsStream)
begin
from inputEmailsStream
select iij_timestamp, groupID, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body, regexstr, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp, str:length(body) as body_size
insert into #inputEmailsStream1;

from #inputEmailsStream1 [ regex:find(regexstr, fromAddress) == true ] select iij_timestamp, groupID, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body, injected_iijtimestamp, body_size insert into #filteredEmailStream1;

from #filteredEmailStream1
select iij_timestamp, groupID, fromAddress, emailProcessorBenchmark:filter(toAddresses) as toAdds, emailProcessorBenchmark:filter(ccAddresses) as ccAdds, emailProcessorBenchmark:filter(bccAddresses) as bccAdds, subject, body, injected_iijtimestamp, body_size insert into #filteredEmailStream2;

from #filteredEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, emailProcessorBenchmark:modify(body) as bodyObfuscated1, injected_iijtimestamp, body_size insert into #modifiedEmailStream1;

from #modifiedEmailStream1 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated1, 'Kenneth Lay', 'Person1') as bodyObfuscated2, injected_iijtimestamp, body_size insert into #modifiedEmailStream2;

from #modifiedEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated2, 'Jeffrey Skilling', 'Person2') as bodyObfuscated3, injected_iijtimestamp, body_size insert into #modifiedEmailStream3;

from #modifiedEmailStream3 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replaceAll(bodyObfuscated3, 'Andrew Fastow', 'Person3') as bodyObfuscated4, injected_iijtimestamp, body_size insert into #modifiedEmailStream4;

from #modifiedEmailStream4 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, emailProcessorBenchmark:mostFrequentWord(bodyObfuscated4, subject) as updatedSubject, bodyObfuscated4 as bodyObfuscated, injected_iijtimestamp, body_size insert into #modifiedEmailStream5;

from #modifiedEmailStream5#throughput:throughput(injected_iijtimestamp,"throughput", 3 , 9 , "filter", 5)
select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated, injected_iijtimestamp, body_size
insert into #modifiedEmailStream;

-- metrics
from #modifiedEmailStream select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated, emailProcessorBenchmark:metrics(bodyObfuscated) as metrics, body_size insert into #metricsEmailStream;

from #metricsEmailStream#window.length(1000) select iij_timestamp, fromAddress insert into AlertStream;
end;

@pcnfernando
Copy link
Contributor

Is the groupID of inputEmailsStream a set of static values or are these dynamic?

@WinmaH
Copy link
Author

WinmaH commented Oct 29, 2019

groupID of inputEmailsStream contains fixed values from 1-9 (9 distinct values which is equal to the number of partial siddhi applications)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants