Skip to content

Commit

Permalink
zmq receiver changed from blocking to nonblocking
Browse files Browse the repository at this point in the history
  • Loading branch information
jontio committed Jul 29, 2021
1 parent 71389d7 commit 8c7ec7e
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions JAERO/zmq_audioreceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ void ZMQAudioReceiver::Start(QString address, QString topic)
}
}

void ZMQAudioReceiver::Stop()
{
running = false;
if(!future.isFinished())future.waitForFinished();
emit finished();
}

void ZMQAudioReceiver::process()
{
// allocate enough for 96Khz sampling with 1 buffer per second
Expand All @@ -32,34 +39,44 @@ void ZMQAudioReceiver::process()
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, _topic.toStdString().c_str(), 5);

char buf [recsize];
char topic[20];
unsigned char rate[4];
quint32 sampleRate;
int received;

running = true;

while(running)
{
zmq_recv(subscriber, topic, 20, 0);
int received = zmq_recv(subscriber, rate, 4, ZMQ_DONTWAIT);

//blocking call alternative for topic, here we wait for the topic.
//I guess the sleep has to be less then the idle period between messages???
while(((received = zmq_recv(subscriber, nullptr, 0, ZMQ_DONTWAIT))<0)&&running)
{
usleep(10000);
}
if(!running)break;

//rate message is next
received = zmq_recv(subscriber, rate, 4, ZMQ_DONTWAIT);
if(!running)break;
memcpy(&sampleRate, rate, 4);

//then audio data
received = zmq_recv(subscriber, buf, recsize, ZMQ_DONTWAIT);
if(!running)break;
if(received>=0)
{
QByteArray qdata(buf, received);
emit recAudio(qdata,sampleRate);
}
else
{
if(running)
{
qDebug()<<"zmq_recv error!!!";
usleep(100000);
}
}

}
}

if(subscriber)zmq_close(subscriber);
if(context)zmq_ctx_destroy(context);
subscriber=nullptr;
context=nullptr;
}

ZMQAudioReceiver::ZMQAudioReceiver(QObject *parent):
QObject(parent),
Expand All @@ -81,18 +98,4 @@ void ZMQAudioReceiver::setParameters(QString address, QString topic)
_topic = topic;
}

void ZMQAudioReceiver::Stop()
{

running = false;

if(subscriber)zmq_close(subscriber);
if(context)zmq_ctx_destroy(context);
if(!future.isFinished())future.waitForFinished();

subscriber=nullptr;
context=nullptr;

emit finished();

}

0 comments on commit 8c7ec7e

Please sign in to comment.