From d3895047d6f6b59d2946fe18ea24a183526d83ca Mon Sep 17 00:00:00 2001 From: hrosa Date: Thu, 9 Jul 2015 15:06:21 +0100 Subject: [PATCH] #23 Finished integrating the RTP handler with the RTP gateway, completely decoupling it from the media mixer components. --- .../io/network/channel/PacketHandler.java | 105 ++- .../media/server/impl/rtp/RtpHandler.java | 62 +- .../server/impl/rtp/RtpMixerComponent.java | 2 +- .../media/server/impl/rtp/RtpTransport.java | 828 +++++++++--------- .../impl/rtp/statistics/RtpStatistics.java | 3 + 5 files changed, 487 insertions(+), 513 deletions(-) diff --git a/io/network/src/main/java/org/mobicents/media/server/io/network/channel/PacketHandler.java b/io/network/src/main/java/org/mobicents/media/server/io/network/channel/PacketHandler.java index a97ce6247..7a69f9b8f 100644 --- a/io/network/src/main/java/org/mobicents/media/server/io/network/channel/PacketHandler.java +++ b/io/network/src/main/java/org/mobicents/media/server/io/network/channel/PacketHandler.java @@ -29,62 +29,61 @@ */ public interface PacketHandler extends Comparable { - /** - * Checks whether the handler can process the incoming packet or not. - * - * @param packet - * The packet to be processed. - * @return true, if the packet can be handled. - * false, otherwise. - */ - boolean canHandle(byte[] packet); + /** + * Checks whether the handler can process the incoming packet or not. + * + * @param packet The packet to be processed. + * @return true, if the packet can be handled. false, otherwise. + */ + boolean canHandle(byte[] packet); - boolean canHandle(byte[] packet, int dataLength, int offset); + boolean canHandle(byte[] packet, int dataLength, int offset); - /** - * Processes the packet and provides a suitable answer. - * - * @param packet - * The packet to be processed. - * @param localPeer - * The local peer who received the packet - * @param remotePeer - * The remote peer who sent the packet - * @return The answer to be sent to the remote peer as response to the - * incoming packet. - * @throws PacketHandlerException - * When the handler cannot process the packet. - */ - byte[] handle(byte[] packet, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException; + /** + * Processes the packet and provides a suitable answer. + * + * @param packet The packet to be processed. + * @param localPeer The local peer who received the packet + * @param remotePeer The remote peer who sent the packet + * @return The answer to be sent to the remote peer as response to the incoming packet. + * @throws PacketHandlerException When the handler cannot process the packet. + */ + byte[] handle(byte[] packet, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException; - /** - * Processes the packet and provides a suitable answer. - * - * @param packet - * The packet to be processed. - * @param dataLength - * The length of the data to be read. - * @param offset - * The initial position to start reading data from. - * @param localPeer - * The local peer who received the packet - * @param remotePeer - * The remote peer who sent the packet - * @return The answer to be sent to the remote peer as response to the - * incoming packet. - * @throws PacketHandlerException - * When the handler cannot process the packet. - */ - byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddress localPeer, InetSocketAddress remotePeer) throws PacketHandlerException; + /** + * Processes the packet and provides a suitable answer. + * + * @param packet The packet to be processed. + * @param dataLength The length of the data to be read. + * @param offset The initial position to start reading data from. + * @param localPeer The local peer who received the packet + * @param remotePeer The remote peer who sent the packet + * @return The answer to be sent to the remote peer as response to the incoming packet. + * @throws PacketHandlerException When the handler cannot process the packet. + */ + byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddress localPeer, InetSocketAddress remotePeer) + throws PacketHandlerException; - /** - * Gets the priority of the handler in the pipeline.
- * The priority affects the place of the handler in the pipeline. This can - * affect performance as handlers with higher priority will be queried first - * when a packet arrives. - * - * @return The priority of the handler - */ - int getPipelinePriority(); + /** + * Gets the priority of the handler in the pipeline. + *

+ * The priority affects the place of the handler in the pipeline. This can affect performance as handlers with higher + * priority will be queried first when a packet arrives. + *

+ * + * @return The priority of the handler + */ + int getPipelinePriority(); + + /** + * Sets the priority of the handler in the pipeline. + *

+ * The priority affects the place of the handler in the pipeline. This can affect performance as handlers with higher + * priority will be queried first when a packet arrives. + *

+ * + * @param priority The priority of the handler in the pipeline. + */ + void setPipelinePriority(int priority); } diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java index 9c1b2a536..14b5ffd99 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpHandler.java @@ -25,14 +25,12 @@ import org.apache.log4j.Logger; import org.mobicents.media.server.impl.rtcp.RtcpHeader; -import org.mobicents.media.server.impl.rtp.rfc2833.DtmfInput; import org.mobicents.media.server.impl.rtp.sdp.RTPFormat; import org.mobicents.media.server.impl.rtp.sdp.RTPFormats; import org.mobicents.media.server.impl.rtp.statistics.RtpStatistics; import org.mobicents.media.server.impl.srtp.DtlsHandler; import org.mobicents.media.server.io.network.channel.PacketHandler; import org.mobicents.media.server.io.network.channel.PacketHandlerException; -import org.mobicents.media.server.scheduler.Scheduler; /** * Handles incoming RTP packets. @@ -45,46 +43,43 @@ public class RtpHandler implements PacketHandler { private static final Logger logger = Logger.getLogger(RtpHandler.class); + // Packet handler properties private int pipelinePriority; + private DtlsHandler dtlsHandler; - private RTPFormats rtpFormats; - private final RtpClock rtpClock; - private final RtpClock oobClock; - + // Channel properties private boolean loopable; private boolean receivable; - - private final RtpStatistics statistics; - private final RtpPacket rtpPacket; - - // Secure RTP private boolean secure; - private DtlsHandler dtlsHandler; - // RTP processing + // RTP components + private RTPFormats rtpFormats; + private final RtpPacket rtpPacket; private final RtpGateway rtpGateway; + private final RtpStatistics statistics; - public RtpHandler(RtpClock clock, RtpClock oobClock, RtpStatistics statistics, RtpGateway rtpGateway) { + public RtpHandler(RtpStatistics statistics, RtpGateway rtpGateway) { + // Packet handler properties this.pipelinePriority = 0; - this.rtpClock = clock; - this.oobClock = oobClock; - - this.rtpFormats = new RTPFormats(); - this.statistics = statistics; - this.rtpPacket = new RtpPacket(RtpPacket.RTP_PACKET_MAX_SIZE, true); + // Channel properties this.receivable = false; this.loopable = false; - this.secure = false; + // RTP components + this.rtpFormats = new RTPFormats(); + this.rtpPacket = new RtpPacket(RtpPacket.RTP_PACKET_MAX_SIZE, true); this.rtpGateway = rtpGateway; + this.statistics = statistics; } + @Override public int getPipelinePriority() { return pipelinePriority; } + @Override public void setPipelinePriority(int pipelinePriority) { this.pipelinePriority = pipelinePriority; } @@ -112,7 +107,6 @@ public void setReceivable(boolean receivable) { */ public void setFormatMap(final RTPFormats rtpFormats) { this.rtpFormats = rtpFormats; - this.jitterBuffer.setFormats(rtpFormats); } public RTPFormats getFormatMap() { @@ -135,10 +129,12 @@ public void reset() { } } + @Override public boolean canHandle(byte[] packet) { return canHandle(packet, packet.length, 0); } - + + @Override public boolean canHandle(byte[] packet, int dataLength, int offset) { /* * The RTP header has the following format: @@ -155,10 +151,10 @@ public boolean canHandle(byte[] packet, int dataLength, int offset) { * | contributing source (CSRC) identifiers | * | .... | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * + * * The first twelve octets are present in every RTP packet, while the * list of CSRC identifiers is present only when inserted by a mixer. - * + * * The version defined by RFC3550 specification is two. */ // Packet must be equal or greater than an RTP Packet Header @@ -235,23 +231,22 @@ public byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddres buffer.flip(); } - // For RTP keep-alive purposes - this.statistics.setLastHeartbeat(this.rtpClock.getWallClock().getTime()); - // RTP v0 packets are used in some applications. Discarded since we do not handle them. if (rtpPacket.getVersion() != 0 && (receivable || loopable)) { - // Queue packet into the jitter buffer if (rtpPacket.getBuffer().limit() > 0) { if (loopable) { // Update statistics for RTCP this.statistics.onRtpReceive(rtpPacket); this.statistics.onRtpSent(rtpPacket); + // Return same packet (looping) so it can be transmitted return packet; } else { // Update statistics for RTCP this.statistics.onRtpReceive(rtpPacket); - // Write packet + + // Send packet to the RTP gateway to be mixed or forwarded + // depending on the relay type defined for the channel int payloadType = rtpPacket.getPayloadType(); RTPFormat format = rtpFormats.find(payloadType); if (format != null) { @@ -270,11 +265,12 @@ public byte[] handle(byte[] packet, int dataLength, int offset, InetSocketAddres } return null; } - - public int compareTo(PacketHandler o) { + + @Override + public int compareTo(PacketHandler o) { if (o == null) { return 1; } return this.getPipelinePriority() - o.getPipelinePriority(); - } + } } diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpMixerComponent.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpMixerComponent.java index 5c1c21e01..b12c68762 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpMixerComponent.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpMixerComponent.java @@ -86,7 +86,7 @@ public RtpMixerComponent(int connectionId, Scheduler scheduler, DspFactory dspFa public void setRtpFormats(RTPFormats formats) { this.jitterBuffer.setFormats(formats); } - + public void processRtpPacket(RtpPacket packet, RTPFormat format) { if (this.rxPackets == 0) { logger.info("Restarting jitter buffer"); diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpTransport.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpTransport.java index 080db3de1..8b8ba09bf 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpTransport.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/RtpTransport.java @@ -55,433 +55,409 @@ * */ public class RtpTransport extends MultiplexedChannel implements DtlsListener { - - private static final Logger logger = Logger.getLogger(RtpTransport.class); - - /** Tells UDP manager to choose port to bind this channel to */ - private final static int PORT_ANY = -1; - - // Channel attributes - private final int channelId; - private boolean bound; - private RtpStatistics statistics; - - // Core elements - private final UdpManager udpManager; - private final Scheduler scheduler; - private final RtpClock clock; - private final RtpClock oobClock; - private final int jitterBufferSize; - - // Heart beat - private final HeartBeat heartBeat; - - // Remote peer - private SocketAddress remotePeer; - - // Protocol handlers pipeline - private static final int RTP_PRIORITY = 3; // a packet each 20ms - private static final int STUN_PRIORITY = 2; // a packet each 400ms - private static final int RTCP_PRIORITY = 1; // a packet each 5s - - private RtpHandler rtpHandler; - private DtlsHandler dtlsHandler; - private StunHandler stunHandler; - private RtcpHandler rtcpHandler; // only used when rtcp-mux is enabled - - // Media formats - protected final static AudioFormat LINEAR_FORMAT = FormatFactory.createAudioFormat("LINEAR", 8000, 16, 1); - protected final static AudioFormat DTMF_FORMAT = FormatFactory.createAudioFormat("telephone-event", 8000); - static { - DTMF_FORMAT.setOptions(new Text("0-15")); - } - - // WebRTC - private boolean secure; - private boolean rtcpMux; - - // Listeners - private RtpListener rtpListener; - - protected RtpTransport(int channelId, int jitterBufferSize, RtpStatistics statistics, RtpClock clock, RtpClock oobClock, Scheduler scheduler, UdpManager udpManager) { - // Initialize MultiplexedChannel elements - super(); - - // Core and network elements - this.scheduler = scheduler; - this.udpManager = udpManager; - this.clock = clock; - this.oobClock = oobClock; - - // Channel attributes - this.channelId = channelId; - this.jitterBufferSize = jitterBufferSize; - this.statistics = statistics; - this.bound = false; - - // Protocol Handlers - this.rtpHandler = new RtpHandler(clock, oobClock, statistics); - - // WebRTC - this.secure = false; - this.rtcpMux = false; - - // Heartbeat - this.heartBeat = new HeartBeat(); - } - - public int getChannelId() { + + private static final Logger logger = Logger.getLogger(RtpTransport.class); + + protected final static AudioFormat LINEAR_FORMAT = FormatFactory.createAudioFormat("LINEAR", 8000, 16, 1); + protected final static AudioFormat DTMF_FORMAT = FormatFactory.createAudioFormat("telephone-event", 8000); + static { + DTMF_FORMAT.setOptions(new Text("0-15")); + } + + private final static int PORT_ANY = -1; + + // Core elements + private final UdpManager udpManager; + private final Scheduler scheduler; + private final HeartBeat heartBeat; + + // Channel attributes + private final int channelId; + private SocketAddress remotePeer; + private boolean bound; + private boolean secure; + private boolean rtcpMux; + + // RTP elements + private final RtpClock clock; + private final RtpClock oobClock; + private RtpListener rtpListener; + private RtpStatistics statistics; + + // Protocol handlers pipeline + private static final int RTP_PRIORITY = 3; // a packet each 20ms + private static final int STUN_PRIORITY = 2; // a packet each 400ms + private static final int RTCP_PRIORITY = 1; // a packet each 5s + + private RtpHandler rtpHandler; + private DtlsHandler dtlsHandler; + private StunHandler stunHandler; + private RtcpHandler rtcpHandler; // only used when rtcp-mux is enabled + + protected RtpTransport(int channelId, RtpStatistics statistics, RtpClock clock, RtpClock oobClock, Scheduler scheduler, + UdpManager udpManager) { + super(); + + // Core elements + this.scheduler = scheduler; + this.udpManager = udpManager; + this.heartBeat = new HeartBeat(); + + // Channel attributes + this.channelId = channelId; + this.bound = false; + this.secure = false; + this.rtcpMux = false; + + // RTP elements + this.clock = clock; + this.oobClock = oobClock; + this.statistics = statistics; + this.rtpHandler = new RtpHandler(clock, statistics); + } + + public int getChannelId() { return channelId; } - - public void setRtpListener(RtpListener listener) { - this.rtpListener = listener; - } - - public long getPacketsReceived() { - return this.statistics.getRtpPacketsReceived(); - } - - public long getPacketsTransmitted() { - return this.statistics.getRtpPacketsSent(); - } - - /** - * Modifies the map between format and RTP payload number - * - * @param rtpFormats - * the format map - */ - public void setFormatMap(RTPFormats rtpFormats) { - flush(); - this.rtpHandler.setFormatMap(rtpFormats); - this.transmitter.setFormatMap(rtpFormats); - } - - public RTPFormats getFormatMap() { - return this.rtpHandler.getFormatMap(); - } - - /** - * Sets the connection mode of the channel.
- * Possible modes: send_only, recv_only, inactive, send_recv, conference, network_loopback. - * - * @param connectionMode - * the new connection mode adopted by the channel - */ - public void updateMode(ConnectionMode connectionMode) { - switch (connectionMode) { - case SEND_ONLY: - this.rtpHandler.setReceivable(false); - this.rtpHandler.setLoopable(false); - this.rtpHandler.deactivate(); - this.transmitter.activate(); - break; - case RECV_ONLY: - this.rtpHandler.setReceivable(true); - this.rtpHandler.setLoopable(false); - this.rtpHandler.activate(); - this.transmitter.deactivate(); - break; - case INACTIVE: - this.rtpHandler.setReceivable(false); - this.rtpHandler.setLoopable(false); - this.rtpHandler.deactivate(); - this.transmitter.deactivate(); - break; - case SEND_RECV: - case CONFERENCE: - this.rtpHandler.setReceivable(true); - this.rtpHandler.setLoopable(false); - this.rtpHandler.activate(); - this.transmitter.activate(); - break; - case NETWORK_LOOPBACK: - this.rtpHandler.setReceivable(false); - this.rtpHandler.setLoopable(true); - this.rtpHandler.deactivate(); - this.transmitter.deactivate(); - break; - default: - break; - } - - boolean connectImmediately = false; - if (this.remotePeer != null) { - connectImmediately = udpManager.connectImmediately((InetSocketAddress) this.remotePeer); - } - - if (udpManager.getRtpTimeout() > 0 && this.remotePeer != null && !connectImmediately) { - if (this.rtpHandler.isReceivable()) { - this.statistics.setLastHeartbeat(scheduler.getClock().getTime()); - scheduler.submitHeatbeat(heartBeat); - } else { - heartBeat.cancel(); - } - } - } - - private void onBinding(boolean useJitterBuffer) { - // Set protocol handler priorities - this.rtpHandler.setPipelinePriority(RTP_PRIORITY); - if(this.rtcpMux) { - this.rtcpHandler.setPipelinePriority(RTCP_PRIORITY); - } - if(this.secure) { - this.stunHandler.setPipelinePriority(STUN_PRIORITY); - } - - // Configure protocol handlers - this.transmitter.setChannel(this.dataChannel); - this.rtpHandler.useJitterBuffer(useJitterBuffer); - this.handlers.addHandler(this.rtpHandler); - - if(this.rtcpMux) { - this.rtcpHandler.setChannel(this.dataChannel); - this.handlers.addHandler(this.rtcpHandler); - } - - if(this.secure) { - this.dtlsHandler.setChannel(this.dataChannel); - this.dtlsHandler.addListener(this); - this.handlers.addHandler(this.stunHandler); - - // Start DTLS handshake - this.dtlsHandler.handshake(); - } - } - - public void bind(boolean isLocal) throws IOException { - // Open this channel with UDP Manager on first available address - this.selectionKey = udpManager.open(this); - this.dataChannel = (DatagramChannel) this.selectionKey.channel(); - - // activate media elements - onBinding(!isLocal); - - // bind data channel - this.udpManager.bind(this.dataChannel, PORT_ANY, isLocal); - this.bound = true; - } - - public void bind(DatagramChannel channel) throws IOException, SocketException { - try { - // Register the channel on UDP Manager - this.selectionKey = udpManager.open(channel, this); - this.dataChannel = channel; - } catch (IOException e) { - throw new SocketException(e.getMessage()); - } - - // activate media elements - onBinding(true); - - // Only bind channel if necessary - if(!channel.socket().isBound()) { - this.udpManager.bind(channel, PORT_ANY); - } - this.bound = true; - } - - public boolean isBound() { - return this.bound; - } - - public boolean isAvailable() { - // The channel is available is is connected - boolean available = this.dataChannel != null && this.dataChannel.isConnected(); - // In case of WebRTC calls the DTLS handshake must be completed - if(this.secure) { - available = available && this.dtlsHandler.isHandshakeComplete(); - } - return available; - } - - public void setRemotePeer(SocketAddress address) { - this.remotePeer = address; - boolean connectImmediately = false; - if (this.dataChannel != null) { - if (this.dataChannel.isConnected()) { - try { - disconnect(); - } catch (IOException e) { - logger.error(e); - } - } - - connectImmediately = udpManager.connectImmediately((InetSocketAddress) address); - if (connectImmediately) { - try { - this.dataChannel.connect(address); - } catch (IOException e) { - logger.info("Can not connect to remote address , please check that you are not using local address - 127.0.0.X to connect to remote"); - logger.error(e.getMessage(), e); - } - } - } - - if (udpManager.getRtpTimeout() > 0 && !connectImmediately) { - if (this.rtpHandler.isReceivable()) { - this.statistics.setLastHeartbeat(scheduler.getClock().getTime()); - scheduler.submitHeatbeat(heartBeat); - } else { - heartBeat.cancel(); - } - } - } - - public String getExternalAddress() { - return this.udpManager.getExternalAddress(); - } - - public boolean hasExternalAddress() { - return notEmpty(this.udpManager.getExternalAddress()); - } - - private boolean notEmpty(String text) { - return text != null && !text.isEmpty(); - } - - public void enableSRTP(String hashFunction, String remotePeerFingerprint, IceAuthenticator authenticator) { - this.secure = true; - - // setup the DTLS handler - if (this.dtlsHandler == null) { - this.dtlsHandler = new DtlsHandler(); - } - this.dtlsHandler.setRemoteFingerprint(hashFunction, remotePeerFingerprint); - - // setup the STUN handler - if (this.stunHandler == null) { - this.stunHandler = new StunHandler(authenticator); - } - - // Setup the RTP handler - this.transmitter.enableSrtp(this.dtlsHandler); - this.rtpHandler.enableSrtp(this.dtlsHandler); - - // Setup the RTCP handler. RTCP-MUX channels only! - if(this.rtcpMux) { - this.rtcpHandler.enableSRTCP(this.dtlsHandler); - } - } - - public void disableSRTP() { - this.secure = false; - - // setup the DTLS handler - if(this.dtlsHandler != null) { - this.dtlsHandler.setRemoteFingerprint("", ""); - this.dtlsHandler.resetLocalFingerprint(); - } - - // setup the STUN handler - if(this.stunHandler != null) { - this.handlers.removeHandler(this.stunHandler); - } - - // Setup the RTP handler - this.transmitter.disableSrtp(); - this.rtpHandler.disableSrtp(); - - // Setup the RTCP handler - if(this.rtcpMux) { - this.rtcpHandler.disableSRTCP(); - } - } - - /** - * Configures whether rtcp-mux is active in this channel or not. - * @param enable decides whether rtcp-mux is to be enabled - */ - public void enableRtcpMux(boolean enable) { - this.rtcpMux = enable; - - // initialize handler if necessary - if(enable && this.rtcpHandler == null) { - this.rtcpHandler = new RtcpHandler(this.statistics); - } - } - - public Text getWebRtcLocalFingerprint() { - if(this.dtlsHandler != null) { - return this.dtlsHandler.getLocalFingerprint(); - } - return new Text(); - } - - public void close() { - if(rtcpMux) { - this.rtcpHandler.leaveRtpSession(); - reset(); - } else { - super.close(); - reset(); - } - this.bound = false; - } - - private void reset() { - // Heartbeat reset - heartBeat.cancel(); - - // RTP reset - this.rtpHandler.reset(); - this.transmitter.reset(); - - // RTCP reset - if(this.rtcpMux) { - this.rtcpHandler.reset(); - } - - // DTLS reset - if(this.secure) { - this.dtlsHandler.reset(); - } - } - - public void onDtlsHandshakeComplete() { - logger.info("DTLS handshake completed for RTP candidate."); - if(this.rtcpMux) { - this.rtcpHandler.joinRtpSession(); - } - } - - public void onDtlsHandshakeFailed(Throwable e) { - this.rtpListener.onRtpFailure(e); - } - - private class HeartBeat extends Task { - - public int getQueueNumber() { - return Scheduler.HEARTBEAT_QUEUE; - } - - @Override - public long perform() { - long elapsedTime = scheduler.getClock().getTime() - statistics.getLastHeartbeat(); - if (elapsedTime > udpManager.getRtpTimeout() * 1000000000L) { - if (rtpListener != null) { - rtpListener.onRtpFailure("RTP timeout! Elapsed time since last heartbeat: " + elapsedTime); - } - } else { - scheduler.submitHeatbeat(this); - } - return 0; - } - } - - public void send(RtpPacket packet) { - if(this.dataChannel.isConnected()) { - // TODO send packet - } - } - - public void sendDtmf(RtpPacket packet) { - if(this.dtmfSupported) { - send(packet); - } - } - + + public void setRtpListener(RtpListener listener) { + this.rtpListener = listener; + } + + public long getPacketsReceived() { + return this.statistics.getRtpPacketsReceived(); + } + + public long getPacketsTransmitted() { + return this.statistics.getRtpPacketsSent(); + } + + /** + * Modifies the map between format and RTP payload number + * + * @param rtpFormats the format map + */ + public void setFormatMap(RTPFormats rtpFormats) { + flush(); + this.rtpHandler.setFormatMap(rtpFormats); + this.transmitter.setFormatMap(rtpFormats); + } + + public RTPFormats getFormatMap() { + return this.rtpHandler.getFormatMap(); + } + + /** + * Sets the connection mode of the channel.
+ * Possible modes: send_only, recv_only, inactive, send_recv, conference, network_loopback. + * + * @param connectionMode the new connection mode adopted by the channel + */ + public void updateMode(ConnectionMode connectionMode) { + switch (connectionMode) { + case SEND_ONLY: + this.rtpHandler.setReceivable(false); + this.rtpHandler.setLoopable(false); + break; + case RECV_ONLY: + this.rtpHandler.setReceivable(true); + this.rtpHandler.setLoopable(false); + break; + case INACTIVE: + this.rtpHandler.setReceivable(false); + this.rtpHandler.setLoopable(false); + break; + case SEND_RECV: + case CONFERENCE: + this.rtpHandler.setReceivable(true); + this.rtpHandler.setLoopable(false); + break; + case NETWORK_LOOPBACK: + this.rtpHandler.setReceivable(false); + this.rtpHandler.setLoopable(true); + break; + default: + break; + } + + boolean connectImmediately = false; + if (this.remotePeer != null) { + connectImmediately = udpManager.connectImmediately((InetSocketAddress) this.remotePeer); + } + + if (udpManager.getRtpTimeout() > 0 && this.remotePeer != null && !connectImmediately) { + if (this.rtpHandler.isReceivable()) { + this.statistics.setLastHeartbeat(scheduler.getClock().getTime()); + scheduler.submitHeatbeat(heartBeat); + } else { + heartBeat.cancel(); + } + } + } + + private void onBinding(boolean useJitterBuffer) { + // Set protocol handler priorities + this.rtpHandler.setPipelinePriority(RTP_PRIORITY); + if (this.rtcpMux) { + this.rtcpHandler.setPipelinePriority(RTCP_PRIORITY); + } + if (this.secure) { + this.stunHandler.setPipelinePriority(STUN_PRIORITY); + } + + // Configure protocol handlers + this.transmitter.setChannel(this.dataChannel); + this.handlers.addHandler(this.rtpHandler); + + if (this.rtcpMux) { + this.rtcpHandler.setChannel(this.dataChannel); + this.handlers.addHandler(this.rtcpHandler); + } + + if (this.secure) { + this.dtlsHandler.setChannel(this.dataChannel); + this.dtlsHandler.addListener(this); + this.handlers.addHandler(this.stunHandler); + + // Start DTLS handshake + this.dtlsHandler.handshake(); + } + } + + public void bind(boolean isLocal) throws IOException { + // Open this channel with UDP Manager on first available address + this.selectionKey = udpManager.open(this); + this.dataChannel = (DatagramChannel) this.selectionKey.channel(); + + // activate media elements + onBinding(!isLocal); + + // bind data channel + this.udpManager.bind(this.dataChannel, PORT_ANY, isLocal); + this.bound = true; + } + + public void bind(DatagramChannel channel) throws IOException, SocketException { + try { + // Register the channel on UDP Manager + this.selectionKey = udpManager.open(channel, this); + this.dataChannel = channel; + } catch (IOException e) { + throw new SocketException(e.getMessage()); + } + + // activate media elements + onBinding(true); + + // Only bind channel if necessary + if (!channel.socket().isBound()) { + this.udpManager.bind(channel, PORT_ANY); + } + this.bound = true; + } + + public boolean isBound() { + return this.bound; + } + + public boolean isAvailable() { + // The channel is available is is connected + boolean available = this.dataChannel != null && this.dataChannel.isConnected(); + // In case of WebRTC calls the DTLS handshake must be completed + if (this.secure) { + available = available && this.dtlsHandler.isHandshakeComplete(); + } + return available; + } + + public void setRemotePeer(SocketAddress address) { + this.remotePeer = address; + boolean connectImmediately = false; + if (this.dataChannel != null) { + if (this.dataChannel.isConnected()) { + try { + disconnect(); + } catch (IOException e) { + logger.error(e); + } + } + + connectImmediately = udpManager.connectImmediately((InetSocketAddress) address); + if (connectImmediately) { + try { + this.dataChannel.connect(address); + } catch (IOException e) { + logger.info("Can not connect to remote address , please check that you are not using local address - 127.0.0.X to connect to remote"); + logger.error(e.getMessage(), e); + } + } + } + + if (udpManager.getRtpTimeout() > 0 && !connectImmediately) { + if (this.rtpHandler.isReceivable()) { + this.statistics.setLastHeartbeat(scheduler.getClock().getTime()); + scheduler.submitHeatbeat(heartBeat); + } else { + heartBeat.cancel(); + } + } + } + + public String getExternalAddress() { + return this.udpManager.getExternalAddress(); + } + + public boolean hasExternalAddress() { + return notEmpty(this.udpManager.getExternalAddress()); + } + + private boolean notEmpty(String text) { + return text != null && !text.isEmpty(); + } + + public void enableSRTP(String hashFunction, String remotePeerFingerprint, IceAuthenticator authenticator) { + this.secure = true; + + // setup the DTLS handler + if (this.dtlsHandler == null) { + this.dtlsHandler = new DtlsHandler(); + } + this.dtlsHandler.setRemoteFingerprint(hashFunction, remotePeerFingerprint); + + // setup the STUN handler + if (this.stunHandler == null) { + this.stunHandler = new StunHandler(authenticator); + } + + // Setup the RTP handler + this.transmitter.enableSrtp(this.dtlsHandler); + this.rtpHandler.enableSrtp(this.dtlsHandler); + + // Setup the RTCP handler. RTCP-MUX channels only! + if (this.rtcpMux) { + this.rtcpHandler.enableSRTCP(this.dtlsHandler); + } + } + + public void disableSRTP() { + this.secure = false; + + // setup the DTLS handler + if (this.dtlsHandler != null) { + this.dtlsHandler.setRemoteFingerprint("", ""); + this.dtlsHandler.resetLocalFingerprint(); + } + + // setup the STUN handler + if (this.stunHandler != null) { + this.handlers.removeHandler(this.stunHandler); + } + + // Setup the RTP handler + this.transmitter.disableSrtp(); + this.rtpHandler.disableSrtp(); + + // Setup the RTCP handler + if (this.rtcpMux) { + this.rtcpHandler.disableSRTCP(); + } + } + + /** + * Configures whether rtcp-mux is active in this channel or not. + * + * @param enable decides whether rtcp-mux is to be enabled + */ + public void enableRtcpMux(boolean enable) { + this.rtcpMux = enable; + + // initialize handler if necessary + if (enable && this.rtcpHandler == null) { + this.rtcpHandler = new RtcpHandler(this.statistics); + } + } + + public Text getWebRtcLocalFingerprint() { + if (this.dtlsHandler != null) { + return this.dtlsHandler.getLocalFingerprint(); + } + return new Text(); + } + + public void close() { + if (rtcpMux) { + this.rtcpHandler.leaveRtpSession(); + reset(); + } else { + super.close(); + reset(); + } + this.bound = false; + } + + private void reset() { + // Heartbeat reset + heartBeat.cancel(); + + // RTP reset + this.rtpHandler.reset(); + this.transmitter.reset(); + + // RTCP reset + if (this.rtcpMux) { + this.rtcpHandler.reset(); + } + + // DTLS reset + if (this.secure) { + this.dtlsHandler.reset(); + } + } + + @Override + public void onDtlsHandshakeComplete() { + logger.info("DTLS handshake completed for RTP candidate."); + if (this.rtcpMux) { + this.rtcpHandler.joinRtpSession(); + } + } + + @Override + public void onDtlsHandshakeFailed(Throwable e) { + this.rtpListener.onRtpFailure(e); + } + + private class HeartBeat extends Task { + + @Override + public int getQueueNumber() { + return Scheduler.HEARTBEAT_QUEUE; + } + + @Override + public long perform() { + long elapsedTime = scheduler.getClock().getTime() - statistics.getLastHeartbeat(); + if (elapsedTime > udpManager.getRtpTimeout() * 1000000000L) { + if (rtpListener != null) { + rtpListener.onRtpFailure("RTP timeout! Elapsed time since last heartbeat: " + elapsedTime); + } + } else { + scheduler.submitHeatbeat(this); + } + return 0; + } + } + + public void send(RtpPacket packet) { + if (this.dataChannel.isConnected()) { + // TODO send packet + } + } + + public void sendDtmf(RtpPacket packet) { + if (this.dtmfSupported) { + send(packet); + } + } } diff --git a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/statistics/RtpStatistics.java b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/statistics/RtpStatistics.java index 980cb238a..8c99e7145 100644 --- a/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/statistics/RtpStatistics.java +++ b/io/rtp/src/main/java/org/mobicents/media/server/impl/rtp/statistics/RtpStatistics.java @@ -577,6 +577,9 @@ public void onRtpReceive(RtpPacket packet) { this.rtpRxOctets += packet.getPayloadLength(); this.rtpReceivedOn = this.wallClock.getTime(); + // For RTP keep-alive mechanism + this.rtpLastHeartbeat = this.rtpReceivedOn; + // Note that there is no point in registering new members if RTCP handler has scheduled a BYE if(RtcpPacketType.RTCP_REPORT.equals(this.rtcpNextPacketType)) { long syncSource = packet.getSyncSource();