diff --git a/src/org/jgroups/protocols/ReliableMulticast.java b/src/org/jgroups/protocols/ReliableMulticast.java index c1a030815c1..455876e72e8 100644 --- a/src/org/jgroups/protocols/ReliableMulticast.java +++ b/src/org/jgroups/protocols/ReliableMulticast.java @@ -105,6 +105,9 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH @ManagedAttribute(description="Number of messages received",type=SCALAR) protected final LongAdder num_messages_received=new LongAdder(); + @ManagedAttribute(description="Number of sends dropped",type=SCALAR) + protected final LongAdder num_sends_dropped=new LongAdder(); + protected static final Message DUMMY_OOB_MSG=new EmptyMessage().setFlag(OOB); // Accepts messages which are (1) non-null, (2) no DUMMY_OOB_MSGs and (3) not OOB_DELIVERED @@ -240,6 +243,7 @@ public ReliableMulticast clearCachedBatches() { public ReliableMulticast sendsCanBlock(boolean s) {this.sends_can_block=s; return this;} public long getNumMessagesSent() {return num_messages_sent.sum();} public long getNumMessagesReceived() {return num_messages_received.sum();} + public long getNumSendsDropped() {return num_sends_dropped.sum();} public boolean reuseMessageBatches() {return reuse_message_batches;} public ReliableMulticast reuseMessageBatches(boolean b) {this.reuse_message_batches=b; return this;} public boolean sendAtomically() {return send_atomically;} @@ -353,6 +357,7 @@ protected Entry sendEntry() { public void resetStats() { num_messages_sent.reset(); num_messages_received.reset(); + num_sends_dropped.reset(); xmit_reqs_received.reset(); xmit_reqs_sent.reset(); xmit_rsps_received.reset(); @@ -518,7 +523,7 @@ public Object down(Message msg) { return down_prot.down(msg); // unicast address: not null and not mcast, pass down unchanged if(!running) { - log.trace("%s: discarded message as we're not in the 'running' state, message: %s", local_addr, msg); + log.trace("%s: discarded message as start() has not yet been called, message: %s", local_addr, msg); return null; } Entry send_entry=sendEntry(); @@ -529,11 +534,17 @@ public Object down(Message msg) { msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK); Buffer win=send_entry.buf(); - send(msg, win, dont_loopback_set); - num_messages_sent.increment(); + boolean sent=send(msg, win, dont_loopback_set); last_seqno_resender.skipNext(); - if(dont_loopback_set && needToSendAck(send_entry, 1)) - handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829 + if (sent) { + num_messages_sent.increment(); + if(dont_loopback_set && needToSendAck(send_entry, 1)) + handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829 + } + else { + num_sends_dropped.increment(); + log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg); + } return null; // don't pass down the stack } @@ -703,7 +714,7 @@ protected void unknownMember(Address sender, Object message) { } } - protected void send(Message msg, Buffer win, boolean dont_loopback_set) { + protected boolean send(Message msg, Buffer win, boolean dont_loopback_set) { long msg_id=seqno.incrementAndGet(); if(is_trace) log.trace("%s --> [all]: #%d", local_addr, msg_id); @@ -718,8 +729,10 @@ protected void send(Message msg, Buffer win, boolean dont_loopback_set) lock.lock(); } try { - if(addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK))) + boolean added=addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null, msg.isFlagSet(DONT_BLOCK)); + if (added) down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted + return added; } finally { if(lock != null) @@ -729,14 +742,17 @@ protected void send(Message msg, Buffer win, boolean dont_loopback_set) /** Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed */ protected boolean addToSendBuffer(Buffer win, long seq, Message msg, Predicate filter, boolean dont_block) { + Buffer.Options opts = sendOptions(); long sleep=10; boolean rc=false; do { try { - rc=win.add(seq, msg, filter, sendOptions(), dont_block); + rc=win.add(seq, msg, filter, opts, dont_block); break; } catch(Throwable t) { + if (!opts.block() || dont_block) + break; if(running) { Util.sleep(sleep); sleep=Math.min(5000, sleep*2); diff --git a/src/org/jgroups/protocols/ReliableUnicast.java b/src/org/jgroups/protocols/ReliableUnicast.java index 4db0665736c..b7f6191d058 100644 --- a/src/org/jgroups/protocols/ReliableUnicast.java +++ b/src/org/jgroups/protocols/ReliableUnicast.java @@ -102,6 +102,8 @@ public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Ha protected final LongAdder num_msgs_sent=new LongAdder(); @ManagedAttribute(description="Number of message received",type=SCALAR) protected final LongAdder num_msgs_received=new LongAdder(); + @ManagedAttribute(description="Number of sends dropped",type=SCALAR) + protected final LongAdder num_sends_dropped=new LongAdder(); @ManagedAttribute(description="Number of acks sent",type=SCALAR) protected final LongAdder num_acks_sent=new LongAdder(); @ManagedAttribute(description="Number of acks received",type=SCALAR) @@ -287,6 +289,7 @@ public ReliableUnicast trimCachedBatches() { public long getNumMessagesReceived() {return num_msgs_received.sum();} + public long getNumSendsDropped() {return num_sends_dropped.sum();} public long getNumAcksSent() {return num_acks_sent.sum();} public long getNumAcksReceived() {return num_acks_received.sum();} public long getNumXmits() {return num_xmits.sum();} @@ -370,8 +373,8 @@ public String printSendWindowMessages() { public void resetStats() { avg_delivery_batch_size.clear(); - Stream.of(num_msgs_sent, num_msgs_received, num_acks_sent, num_acks_received, num_xmits, xmit_reqs_received, - xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset); + Stream.of(num_msgs_sent, num_msgs_received, num_sends_dropped, num_acks_sent, num_acks_received, num_xmits, + xmit_reqs_received, xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset); send_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats); recv_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats); } @@ -693,8 +696,14 @@ public Object down(Message msg) { SenderEntry entry=getSenderEntry(dst); boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr), dont_block=msg.isFlagSet(DONT_BLOCK); - if(send(msg, entry, dont_loopback_set, dont_block)) + boolean sent=send(msg, entry, dont_loopback_set, dont_block); + if(sent) { num_msgs_sent.increment(); + } + else { + num_sends_dropped.increment(); + log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg); + } return null; // the message was already sent down the stack in send() } @@ -1132,14 +1141,17 @@ protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set */ protected boolean addToSendBuffer(Buffer win, long seq, Message msg, Predicate filter, boolean dont_block) { + Buffer.Options opts = sendOptions(); long sleep=10; boolean rc=false; do { try { - rc=win.add(seq, msg, filter, sendOptions(), dont_block); + rc=win.add(seq, msg, filter, opts, dont_block); break; } catch(Throwable t) { + if (!opts.block() || dont_block) + break; if(running) { Util.sleep(sleep); sleep=Math.min(5000, sleep*2);