Skip to content

Commit

Permalink
Added a new SendRequest method to send SMPP requests synchronouslly (…
Browse files Browse the repository at this point in the history
…waiting for a response).
  • Loading branch information
pruiz committed Aug 22, 2011
1 parent b93223a commit 800ee8b
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 14 deletions.
2 changes: 2 additions & 0 deletions AberrantSMPP/AberrantSMPP.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
<Compile Include="EventObject\SubmitSmRespEventArgs.cs" />
<Compile Include="EventObject\UnbindEventArgs.cs" />
<Compile Include="EventObject\UnbindRespEventArgs.cs" />
<Compile Include="Exceptions\SmppRemoteException.cs" />
<Compile Include="Exceptions\SmppTimeoutException.cs" />
<Compile Include="Helper.cs" />
<Compile Include="Packet\CommandId.cs" />
<Compile Include="Packet\DataCoding.cs" />
Expand Down
35 changes: 35 additions & 0 deletions AberrantSMPP/Exceptions/SmppRemoteException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace AberrantSMPP.Exceptions
{
/// <summary>
/// Remote party reported an error to our request.
/// </summary>
[Serializable]
public class SmppRemoteException : Exception
{
/// <summary>
/// Gets or sets the command status/error code indicated by remote party.
/// </summary>
/// <value>The error code.</value>
public uint ErrorCode { get; private set; }

protected SmppRemoteException() { }
protected SmppRemoteException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context)
: base(info, context) { }

public SmppRemoteException(string message, uint errorCode) : base(message)
{
ErrorCode = errorCode;
}

public SmppRemoteException(string message, Exception inner) : base(message, inner)
{
}
}
}
22 changes: 22 additions & 0 deletions AberrantSMPP/Exceptions/SmppTimeoutException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace AberrantSMPP.Exceptions
{
/// <summary>
/// Communication timeout during an SMPP transaction.
/// </summary>
[Serializable]
public class SmppTimeoutException : Exception
{
public SmppTimeoutException() { }
public SmppTimeoutException(string message) : base(message) { }
public SmppTimeoutException(string message, Exception inner) : base(message, inner) { }
protected SmppTimeoutException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context)
: base(info, context) { }
}
}
133 changes: 119 additions & 14 deletions AberrantSMPP/SMPPCommunicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
using System.ComponentModel;
using System.Runtime.CompilerServices;

using AberrantSMPP.Packet;
using AberrantSMPP.Packet.Request;
using AberrantSMPP.Utility;
using AberrantSMPP.Packet.Response;
using AberrantSMPP.Packet;
using AberrantSMPP.Exceptions;
using AberrantSMPP.EventObjects;
using AberrantSMPP.Utility;

namespace AberrantSMPP
{
Expand All @@ -44,6 +45,20 @@ namespace AberrantSMPP
/// </summary>
public class SMPPCommunicator : Component, IDisposable
{
private class RequestState
{
public readonly uint SequenceNumber;
public readonly ManualResetEvent EventHandler;
public SmppResponse Response { get; set; }

public RequestState(uint seqno)
{
SequenceNumber = seqno;
EventHandler = new ManualResetEvent(false);
Response = null;
}
}

private static readonly global::Common.Logging.ILog _Log = global::Common.Logging.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

private readonly object _bindingLock = new object();
Expand Down Expand Up @@ -269,6 +284,15 @@ public int ReBindInterval
}
}
/// <summary>
/// Gets or sets the response timeout (in miliseconds)
/// </summary>
/// <value>The response timeout.</value>
public int ResponseTimeout
{
get { return _ResponseTimeout; }
set { _ResponseTimeout = value; }
}
/// <summary>
/// Gets a value indicating whether this <see cref="SMPPCommunicator"/> is connected.
/// </summary>
/// <value><c>true</c> if connected; otherwise, <c>false</c>.</value>
Expand Down Expand Up @@ -561,9 +585,10 @@ public uint SendPdu(Pdu packet)
var bytes = packet.GetEncodedPdu();

if (asClient == null || !asClient.Connected)
{
throw new InvalidOperationException("Session not connected to remote party.");
}

if (!(packet is SmppBind) && !_Bound)
throw new InvalidOperationException("Session not bound to remote party.");

try
{
Expand All @@ -582,6 +607,39 @@ public uint SendPdu(Pdu packet)
}
}

private IDictionary<uint, RequestState> _RequestsAwaitingResponse = new Dictionary<uint, RequestState>();

/// <summary>
/// Sends a request and waits for the appropiate response.
/// If no response is received before RequestTimeout seconds, an
/// SmppTimeoutException is thrown.
/// </summary>
/// <param name="request">The request.</param>
public SmppResponse SendRequest(SmppRequest request)
{
RequestState state;

lock (_RequestsAwaitingResponse)
{
state = new RequestState(SendPdu(request));
_RequestsAwaitingResponse.Add(state.SequenceNumber, state);
}

var signalled = state.EventHandler.WaitOne(_ResponseTimeout);

lock (_RequestsAwaitingResponse)
{
_RequestsAwaitingResponse.Remove(state.SequenceNumber);

if (signalled)
{
return state.Response;
} else {
throw new SmppTimeoutException("Timeout while waiting for a response from remote side.");
}
}
}

/// <summary>
/// Sends a long message possibly by splitting it on multiple SMPP PDUs.
/// </summary>
Expand Down Expand Up @@ -733,7 +791,10 @@ public bool Bind()

asClient.Connect(Host, Port);

lock (this) _SequenceNumber = 1; // re-initialize seq. numbers.
// re-initialize seq. numbers.
lock (this) _SequenceNumber = 1;
// SequenceNumbers are per-session, so reset waiting list..
lock (_RequestsAwaitingResponse) _RequestsAwaitingResponse.Clear();

SmppBind request = new SmppBind();
request.SystemId = SystemId;
Expand All @@ -745,7 +806,11 @@ public bool Bind()
request.AddressRange = AddressRange;
request.BindType = BindType;

SendPdu(request);
var response = SendRequest(request);

if (response.CommandStatus != 0)
throw new SmppRemoteException("Bind request failed.", response.CommandStatus);

_SentUnbindPacket = false;

// Enable/Disable enquire timer.
Expand Down Expand Up @@ -894,15 +959,35 @@ private void Unbind(bool disableReBind)
/// <param name="queueStateObj">The queue of byte packets.</param>
private void ProcessPduQueue(object queueStateObj)
{
Queue responseQueue = queueStateObj as Queue;

foreach(Pdu response in responseQueue)
foreach(Pdu packet in (queueStateObj as Queue))
{
if (packet == null) continue;

_Log.DebugFormat("Recived PDU: {0}", packet);

try
{
//based on each Pdu, fire off an event
if (response != null)
FireEvents(response);
// Handle packets related to a request awaiting response.
if (packet is SmppResponse || packet is SmppGenericNack)
{
lock (_RequestsAwaitingResponse)
{
if (_RequestsAwaitingResponse.ContainsKey(packet.SequenceNumber))
{
var state = _RequestsAwaitingResponse[packet.SequenceNumber];

// Save response at bucket..
state.Response = packet is SmppGenericNack ?
new SmppGenericNackResp(packet.PacketBytes) : packet as SmppResponse;
// Signal response reception..
state.EventHandler.Set();
continue;
}
}
}

// based on each Pdu, fire off an event
FireEvents(packet);
}
catch (Exception exception)
{
Expand All @@ -918,13 +1003,24 @@ private void ProcessPduQueue(object queueStateObj)
/// <param name="ea"></param>
private void EnquireLinkTimerElapsed(object sender, ElapsedEventArgs ea)
{
bool locked = false;

if (!_Bound)
{
_Log.Warn("Cannot send enquire request over an unbound session!");
return;
}

SendPdu(new SmppEnquireLink());
try
{
locked = Monitor.TryEnter(_EnquireLinkTimer);

SendPdu(new SmppEnquireLink());
}
finally
{
if (locked) Monitor.Exit(_EnquireLinkTimer);
}
}
/// <summary>
/// Performs a re-bind if current connection was lost.
Expand All @@ -933,8 +1029,12 @@ private void EnquireLinkTimerElapsed(object sender, ElapsedEventArgs ea)
/// <param name="ea">The <see cref="System.Timers.ElapsedEventArgs"/> instance containing the event data.</param>
private void ReBindTimerElapsed(object sender, ElapsedEventArgs ea)
{
lock (_bindingLock)
var locked = false;

try
{
locked = Monitor.TryEnter(_bindingLock);

if (!_ReBindRequired)
return;

Expand All @@ -952,6 +1052,10 @@ private void ReBindTimerElapsed(object sender, ElapsedEventArgs ea)
}
catch { }
}
finally
{
if (locked) Monitor.Exit(_bindingLock);
}
}

/// <summary>
Expand Down Expand Up @@ -1235,6 +1339,7 @@ private void InitCommunicator()
SystemType = null;
EnquireLinkInterval = 0;
ReBindInterval = 10;
ResponseTimeout = 2500;

// Initialize timers..
_EnquireLinkTimer = new System.Timers.Timer() { Enabled = false };
Expand Down
2 changes: 2 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ TODO
- Review concurrency issues..
- Change ReBindTimer by a AutoResetEvent which also allows checking periodically
if we need to reconnect, but which can be signalled explicitly to try to reconnect now.
- Change EnquireLinkTimer & ReBindTimer to AutoReset=false, so we avoid events from queuing
when handler take too long to finish.

0 comments on commit 800ee8b

Please sign in to comment.