Skip to content

Commit

Permalink
Log dropped sends in Make ReliableMulticast/Unicast
Browse files Browse the repository at this point in the history
This change makes it so warnings are logged when sends are dropped by ReliableMulticast/Unicast because send buffers are full.
  • Loading branch information
cfredri4 committed Jan 20, 2025
1 parent ace28d1 commit 15c1d99
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
32 changes: 24 additions & 8 deletions src/org/jgroups/protocols/ReliableMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH
@ManagedAttribute(description="Number of messages received",type=SCALAR)
protected final LongAdder num_messages_received=new LongAdder();

@ManagedAttribute(description="Number of sends dropped",type=SCALAR)
protected final LongAdder num_sends_dropped=new LongAdder();

protected static final Message DUMMY_OOB_MSG=new EmptyMessage().setFlag(OOB);

// Accepts messages which are (1) non-null, (2) no DUMMY_OOB_MSGs and (3) not OOB_DELIVERED
Expand Down Expand Up @@ -240,6 +243,7 @@ public ReliableMulticast clearCachedBatches() {
public ReliableMulticast sendsCanBlock(boolean s) {this.sends_can_block=s; return this;}
public long getNumMessagesSent() {return num_messages_sent.sum();}
public long getNumMessagesReceived() {return num_messages_received.sum();}
public long getNumSendsDropped() {return num_sends_dropped.sum();}
public boolean reuseMessageBatches() {return reuse_message_batches;}
public ReliableMulticast reuseMessageBatches(boolean b) {this.reuse_message_batches=b; return this;}
public boolean sendAtomically() {return send_atomically;}
Expand Down Expand Up @@ -353,6 +357,7 @@ protected Entry sendEntry() {
public void resetStats() {
num_messages_sent.reset();
num_messages_received.reset();
num_sends_dropped.reset();
xmit_reqs_received.reset();
xmit_reqs_sent.reset();
xmit_rsps_received.reset();
Expand Down Expand Up @@ -518,7 +523,7 @@ public Object down(Message msg) {
return down_prot.down(msg); // unicast address: not null and not mcast, pass down unchanged

if(!running) {
log.trace("%s: discarded message as we're not in the 'running' state, message: %s", local_addr, msg);
log.trace("%s: discarded message as start() has not yet been called, message: %s", local_addr, msg);
return null;
}
Entry send_entry=sendEntry();
Expand All @@ -529,11 +534,17 @@ public Object down(Message msg) {
msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK);
Buffer<Message> win=send_entry.buf();
send(msg, win, dont_loopback_set);
num_messages_sent.increment();
boolean sent=send(msg, win, dont_loopback_set);
last_seqno_resender.skipNext();
if(dont_loopback_set && needToSendAck(send_entry, 1))
handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829
if (sent) {
num_messages_sent.increment();
if(dont_loopback_set && needToSendAck(send_entry, 1))
handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829
}
else {
num_sends_dropped.increment();
log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg);
}
return null; // don't pass down the stack
}

Expand Down Expand Up @@ -703,7 +714,7 @@ protected void unknownMember(Address sender, Object message) {
}
}

protected void send(Message msg, Buffer<Message> win, boolean dont_loopback_set) {
protected boolean send(Message msg, Buffer<Message> win, boolean dont_loopback_set) {
long msg_id=seqno.incrementAndGet();
if(is_trace)
log.trace("%s --> [all]: #%d", local_addr, msg_id);
Expand All @@ -718,8 +729,10 @@ protected void send(Message msg, Buffer<Message> win, boolean dont_loopback_set)
lock.lock();
}
try {
if(addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK)))
boolean added=addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK));
if (added)
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
return added;
}
finally {
if(lock != null)
Expand All @@ -729,14 +742,17 @@ protected void send(Message msg, Buffer<Message> win, boolean dont_loopback_set)

/** Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed */
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter, boolean dont_block) {
Buffer.Options opts = sendOptions();
long sleep=10;
boolean rc=false;
do {
try {
rc=win.add(seq, msg, filter, sendOptions(), dont_block);
rc=win.add(seq, msg, filter, opts, dont_block);
break;
}
catch(Throwable t) {
if (!opts.block() || dont_block)
break;
if(running) {
Util.sleep(sleep);
sleep=Math.min(5000, sleep*2);
Expand Down
20 changes: 16 additions & 4 deletions src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Ha
protected final LongAdder num_msgs_sent=new LongAdder();
@ManagedAttribute(description="Number of message received",type=SCALAR)
protected final LongAdder num_msgs_received=new LongAdder();
@ManagedAttribute(description="Number of sends dropped",type=SCALAR)
protected final LongAdder num_sends_dropped=new LongAdder();
@ManagedAttribute(description="Number of acks sent",type=SCALAR)
protected final LongAdder num_acks_sent=new LongAdder();
@ManagedAttribute(description="Number of acks received",type=SCALAR)
Expand Down Expand Up @@ -287,6 +289,7 @@ public ReliableUnicast trimCachedBatches() {
public long getNumMessagesReceived() {return num_msgs_received.sum();}


public long getNumSendsDropped() {return num_sends_dropped.sum();}
public long getNumAcksSent() {return num_acks_sent.sum();}
public long getNumAcksReceived() {return num_acks_received.sum();}
public long getNumXmits() {return num_xmits.sum();}
Expand Down Expand Up @@ -370,8 +373,8 @@ public String printSendWindowMessages() {

public void resetStats() {
avg_delivery_batch_size.clear();
Stream.of(num_msgs_sent, num_msgs_received, num_acks_sent, num_acks_received, num_xmits, xmit_reqs_received,
xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset);
Stream.of(num_msgs_sent, num_msgs_received, num_sends_dropped, num_acks_sent, num_acks_received, num_xmits,
xmit_reqs_received, xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset);
send_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
recv_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
}
Expand Down Expand Up @@ -693,8 +696,14 @@ public Object down(Message msg) {
SenderEntry entry=getSenderEntry(dst);
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr),
dont_block=msg.isFlagSet(DONT_BLOCK);
if(send(msg, entry, dont_loopback_set, dont_block))
boolean sent=send(msg, entry, dont_loopback_set, dont_block);
if(sent) {
num_msgs_sent.increment();
}
else {
num_sends_dropped.increment();
log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg);
}
return null; // the message was already sent down the stack in send()
}

Expand Down Expand Up @@ -1132,14 +1141,17 @@ protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set
*/
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg,
Predicate<Message> filter, boolean dont_block) {
Buffer.Options opts = sendOptions();
long sleep=10;
boolean rc=false;
do {
try {
rc=win.add(seq, msg, filter, sendOptions(), dont_block);
rc=win.add(seq, msg, filter, opts, dont_block);
break;
}
catch(Throwable t) {
if (!opts.block() || dont_block)
break;
if(running) {
Util.sleep(sleep);
sleep=Math.min(5000, sleep*2);
Expand Down

0 comments on commit 15c1d99

Please sign in to comment.