Skip to content

Commit

Permalink
#23 Finished integrating the RTP handler with the RTP gateway, comple…
Browse files Browse the repository at this point in the history
…tely decoupling it from the media mixer components.
  • Loading branch information
hrosa committed Jul 9, 2015
1 parent f8d8f4f commit d389504
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,62 +29,61 @@
*/
public interface PacketHandler extends Comparable<PacketHandler> {

/**
* Checks whether the handler can process the incoming packet or not.
*
* @param packet
* The packet to be processed.
* @return <code>true</code>, if the packet can be handled.
* <code>false</code>, otherwise.
*/
boolean canHandle(byte[] packet);
/**
* Checks whether the handler can process the incoming packet or not.
*
* @param packet The packet to be processed.
* @return <code>true</code>, if the packet can be handled. <code>false</code>, otherwise.
*/
boolean canHandle(byte[] packet);

boolean canHandle(byte[] packet, int dataLength, int offset);
boolean canHandle(byte[] packet, int dataLength, int offset);

/**
* Processes the packet and provides a suitable answer.
*
* @param packet
* The packet to be processed.
* @param localPeer
* The local peer who received the packet
* @param remotePeer
* The remote peer who sent the packet
* @return The answer to be sent to the remote peer as response to the
* incoming packet.
* @throws PacketHandlerException
* When the handler cannot process the packet.
*/
byte[] handle(byte[] packet, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException;
/**
* Processes the packet and provides a suitable answer.
*
* @param packet The packet to be processed.
* @param localPeer The local peer who received the packet
* @param remotePeer The remote peer who sent the packet
* @return The answer to be sent to the remote peer as response to the incoming packet.
* @throws PacketHandlerException When the handler cannot process the packet.
*/
byte[] handle(byte[] packet, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException;

/**
* Processes the packet and provides a suitable answer.
*
* @param packet
* The packet to be processed.
* @param dataLength
* The length of the data to be read.
* @param offset
* The initial position to start reading data from.
* @param localPeer
* The local peer who received the packet
* @param remotePeer
* The remote peer who sent the packet
* @return The answer to be sent to the remote peer as response to the
* incoming packet.
* @throws PacketHandlerException
* When the handler cannot process the packet.
*/
byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException;
/**
* Processes the packet and provides a suitable answer.
*
* @param packet The packet to be processed.
* @param dataLength The length of the data to be read.
* @param offset The initial position to start reading data from.
* @param localPeer The local peer who received the packet
* @param remotePeer The remote peer who sent the packet
* @return The answer to be sent to the remote peer as response to the incoming packet.
* @throws PacketHandlerException When the handler cannot process the packet.
*/
byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddress localPeer, InetSocketAddress remotePeer)
throws PacketHandlerException;

/**
* Gets the priority of the handler in the pipeline.<br>
* The priority affects the place of the handler in the pipeline. This can
* affect performance as handlers with higher priority will be queried first
* when a packet arrives.
*
* @return The priority of the handler
*/
int getPipelinePriority();
/**
* Gets the priority of the handler in the pipeline.
* <p>
* The priority affects the place of the handler in the pipeline. This can affect performance as handlers with higher
* priority will be queried first when a packet arrives.
* </p>
*
* @return The priority of the handler
*/
int getPipelinePriority();

/**
* Sets the priority of the handler in the pipeline.
* <p>
* The priority affects the place of the handler in the pipeline. This can affect performance as handlers with higher
* priority will be queried first when a packet arrives.
* </p>
*
* @param priority The priority of the handler in the pipeline.
*/
void setPipelinePriority(int priority);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@

import org.apache.log4j.Logger;
import org.mobicents.media.server.impl.rtcp.RtcpHeader;
import org.mobicents.media.server.impl.rtp.rfc2833.DtmfInput;
import org.mobicents.media.server.impl.rtp.sdp.RTPFormat;
import org.mobicents.media.server.impl.rtp.sdp.RTPFormats;
import org.mobicents.media.server.impl.rtp.statistics.RtpStatistics;
import org.mobicents.media.server.impl.srtp.DtlsHandler;
import org.mobicents.media.server.io.network.channel.PacketHandler;
import org.mobicents.media.server.io.network.channel.PacketHandlerException;
import org.mobicents.media.server.scheduler.Scheduler;

/**
* Handles incoming RTP packets.
Expand All @@ -45,46 +43,43 @@ public class RtpHandler implements PacketHandler {

private static final Logger logger = Logger.getLogger(RtpHandler.class);

// Packet handler properties
private int pipelinePriority;
private DtlsHandler dtlsHandler;

private RTPFormats rtpFormats;
private final RtpClock rtpClock;
private final RtpClock oobClock;

// Channel properties
private boolean loopable;
private boolean receivable;

private final RtpStatistics statistics;
private final RtpPacket rtpPacket;

// Secure RTP
private boolean secure;
private DtlsHandler dtlsHandler;

// RTP processing
// RTP components
private RTPFormats rtpFormats;
private final RtpPacket rtpPacket;
private final RtpGateway rtpGateway;
private final RtpStatistics statistics;

public RtpHandler(RtpClock clock, RtpClock oobClock, RtpStatistics statistics, RtpGateway rtpGateway) {
public RtpHandler(RtpStatistics statistics, RtpGateway rtpGateway) {
// Packet handler properties
this.pipelinePriority = 0;

this.rtpClock = clock;
this.oobClock = oobClock;

this.rtpFormats = new RTPFormats();
this.statistics = statistics;
this.rtpPacket = new RtpPacket(RtpPacket.RTP_PACKET_MAX_SIZE, true);
// Channel properties
this.receivable = false;
this.loopable = false;

this.secure = false;

// RTP components
this.rtpFormats = new RTPFormats();
this.rtpPacket = new RtpPacket(RtpPacket.RTP_PACKET_MAX_SIZE, true);
this.rtpGateway = rtpGateway;
this.statistics = statistics;
}

@Override
public int getPipelinePriority() {
return pipelinePriority;
}

@Override
public void setPipelinePriority(int pipelinePriority) {
this.pipelinePriority = pipelinePriority;
}
Expand Down Expand Up @@ -112,7 +107,6 @@ public void setReceivable(boolean receivable) {
*/
public void setFormatMap(final RTPFormats rtpFormats) {
this.rtpFormats = rtpFormats;
this.jitterBuffer.setFormats(rtpFormats);
}

public RTPFormats getFormatMap() {
Expand All @@ -135,10 +129,12 @@ public void reset() {
}
}

@Override
public boolean canHandle(byte[] packet) {
return canHandle(packet, packet.length, 0);
}


@Override
public boolean canHandle(byte[] packet, int dataLength, int offset) {
/*
* The RTP header has the following format:
Expand All @@ -155,10 +151,10 @@ public boolean canHandle(byte[] packet, int dataLength, int offset) {
* | contributing source (CSRC) identifiers |
* | .... |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*
*
* The first twelve octets are present in every RTP packet, while the
* list of CSRC identifiers is present only when inserted by a mixer.
*
*
* The version defined by RFC3550 specification is two.
*/
// Packet must be equal or greater than an RTP Packet Header
Expand Down Expand Up @@ -235,23 +231,22 @@ public byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddres
buffer.flip();
}

// For RTP keep-alive purposes
this.statistics.setLastHeartbeat(this.rtpClock.getWallClock().getTime());

// RTP v0 packets are used in some applications. Discarded since we do not handle them.
if (rtpPacket.getVersion() != 0 && (receivable || loopable)) {
// Queue packet into the jitter buffer
if (rtpPacket.getBuffer().limit() > 0) {
if (loopable) {
// Update statistics for RTCP
this.statistics.onRtpReceive(rtpPacket);
this.statistics.onRtpSent(rtpPacket);

// Return same packet (looping) so it can be transmitted
return packet;
} else {
// Update statistics for RTCP
this.statistics.onRtpReceive(rtpPacket);
// Write packet

// Send packet to the RTP gateway to be mixed or forwarded
// depending on the relay type defined for the channel
int payloadType = rtpPacket.getPayloadType();
RTPFormat format = rtpFormats.find(payloadType);
if (format != null) {
Expand All @@ -270,11 +265,12 @@ public byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddres
}
return null;
}

public int compareTo(PacketHandler o) {

@Override
public int compareTo(PacketHandler o) {
if (o == null) {
return 1;
}
return this.getPipelinePriority() - o.getPipelinePriority();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public RtpMixerComponent(int connectionId, Scheduler scheduler, DspFactory dspFa
public void setRtpFormats(RTPFormats formats) {
this.jitterBuffer.setFormats(formats);
}

public void processRtpPacket(RtpPacket packet, RTPFormat format) {
if (this.rxPackets == 0) {
logger.info("Restarting jitter buffer");
Expand Down
Loading

0 comments on commit d389504

Please sign in to comment.