Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix p2p taskStatus disappear issue #881

Merged
merged 2 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public TaskRequestActiveNodes(final IP2pMgr _mgr, final Logger p2pLOG) {
public void run() {
INode node = mgr.getRandom();
if (node != null) {
Thread.currentThread().setName("p2p-reqNodes");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have to set the thread's name, would it make sense for TaskRequestActiveNodes to extend Thread instead of implement Runnable? I feel like we're assuming things about which thread this Runnable will be in, so why not just make it a Thread?

alternatively, make the thread-name-setting happen in P2pMgr line 231 instead (set the name before starting threadStatus)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose to use runnable is for saving the memory overhead, cause the task only executing run() method. you don't need to create a lot of objects by extending it to a Thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the thead name mainly for more readable when you are using jstack to debug the thread info.

if (p2pLOG.isTraceEnabled()) {
p2pLOG.trace("TaskRequestActiveNodes: {}", node.toString());
}
Expand Down
13 changes: 7 additions & 6 deletions modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ public void run() {
}

if (p2pLOG.isInfoEnabled()) {
scheduledWorkers.scheduleWithFixedDelay(
getStatusInstance(), 2, PERIOD_SHOW_STATUS, TimeUnit.MILLISECONDS);
Thread threadStatus = new Thread(getStatusInstance(), "p2p-ts");
threadStatus.setPriority(Thread.NORM_PRIORITY);
threadStatus.start();
}

if (!syncSeedsOnly) {
Expand Down Expand Up @@ -461,19 +462,19 @@ private TaskInbound getInboundInstance() {
}

private TaskSend getSendInstance(int i) {
return new TaskSend(this, i, this.sendMsgQue, this.start, this.nodeMgr, this.selector);
return new TaskSend(this, i, sendMsgQue, start, nodeMgr, selector);
}

private TaskReceive getReceiveInstance() {
return new TaskReceive(this.start, this.receiveMsgQue, this.handlers);
return new TaskReceive(start, receiveMsgQue, handlers);
}

private TaskStatus getStatusInstance() {
return new TaskStatus(this.nodeMgr, this.selfShortId, this.sendMsgQue, this.receiveMsgQue);
return new TaskStatus(start, nodeMgr, selfShortId, sendMsgQue, receiveMsgQue);
}

private TaskClear getClearInstance() {
return new TaskClear(this.nodeMgr, this.start);
return new TaskClear(nodeMgr, start);
}

private TaskConnectPeers getConnectPeersInstance() {
Expand Down
38 changes: 27 additions & 11 deletions modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.aion.p2p.impl1.P2pMgr.p2pLOG;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.aion.p2p.INodeMgr;

public class TaskStatus implements Runnable {
Expand All @@ -12,7 +13,11 @@ public class TaskStatus implements Runnable {
private final BlockingQueue<MsgOut> sendMsgQue;
private final BlockingQueue<MsgIn> receiveMsgQue;

private static final int PERIOD_STATUS = 10000;
private final AtomicBoolean start;

public TaskStatus(
final AtomicBoolean _start,
final INodeMgr _nodeMgr,
final String _selfShortId,
final BlockingQueue<MsgOut> _sendMsgQue,
Expand All @@ -21,21 +26,32 @@ public TaskStatus(
this.selfShortId = _selfShortId;
this.sendMsgQue = _sendMsgQue;
this.receiveMsgQue = _receiveMsgQue;
this.start = _start;
}

@Override
public void run() {
Thread.currentThread().setName("p2p-ts");
String status = this.nodeMgr.dumpNodeInfo(this.selfShortId, p2pLOG.isDebugEnabled());

if (p2pLOG.isDebugEnabled()) {
p2pLOG.debug(status);
p2pLOG.debug(
"recv queue[{}] send queue[{}]",
this.receiveMsgQue.size(),
this.sendMsgQue.size());
} else if (p2pLOG.isInfoEnabled()) {
p2pLOG.info(status);
p2pLOG.debug("P2p taskStatus start running.");
while (start.get()) {
try {
Thread.sleep(PERIOD_STATUS);
String status = nodeMgr.dumpNodeInfo(selfShortId, p2pLOG.isDebugEnabled());

if (p2pLOG.isDebugEnabled()) {
p2pLOG.debug(status);
p2pLOG.debug(
"recv queue[{}] send queue[{}]",
receiveMsgQue.size(),
sendMsgQue.size());
} else if (p2pLOG.isInfoEnabled()) {
p2pLOG.info(status);
}
} catch (InterruptedException e) {
p2pLOG.warn("P2p taskStatus InterruptedException! ", e);
} catch (Exception e) {
p2pLOG.warn("P2p taskStatus exception! ", e);
}
}
p2pLOG.info("P2p taskStatus has been shut down.");
}
}
6 changes: 5 additions & 1 deletion modP2pImpl/test/org/aion/p2p/impl1/tasks/TaskStatusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.aion.log.AionLoggerFactory;
import org.aion.log.LogEnum;
import org.aion.log.LogLevel;
Expand Down Expand Up @@ -38,14 +39,17 @@ public void setup() {
@Test(timeout = 10_000)
public void testRun() throws InterruptedException {

TaskStatus ts = new TaskStatus(nodeMgr, "1", msgOutQue, msgInQue);
final AtomicBoolean ab = new AtomicBoolean(true);

TaskStatus ts = new TaskStatus(ab, nodeMgr, "1", msgOutQue, msgInQue);
assertNotNull(ts);
when(nodeMgr.dumpNodeInfo(anyString(), anyBoolean())).thenReturn("get Status");

Thread t = new Thread(ts);
t.start();
assertTrue(t.isAlive());

ab.set(false);
while (!t.getState().toString().contains("TERMINATED")) {
Thread.sleep(10);
}
Expand Down