Skip to content

Commit

Permalink
minor bugfix / improvements. Multithreading fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesebeck committed Oct 29, 2014
1 parent d9c580b commit e6a8c6c
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>de.caluga</groupId>
<artifactId>morphium</artifactId>
<version>2.2.14</version>
<version>2.2.15</version>
<packaging>jar</packaging>
<parent>
<groupId>org.sonatype.oss</groupId>
Expand Down
3 changes: 3 additions & 0 deletions src/de/caluga/morphium/Morphium.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public Morphium(MorphiumConfig cfg) {

}

public ThreadPoolExecutor getAsyncOperationsThreadPool() {
return asyncOperationsThreadPool;
}
public void setConfig(MorphiumConfig cfg) {
if (config != null) {
throw new RuntimeException("Cannot change config!");
Expand Down
4 changes: 2 additions & 2 deletions src/de/caluga/morphium/query/QueryFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface QueryFactory {

public void setQueryImpl(Class<? extends Query> queryImpl);

void setExecutor(ThreadPoolExecutor ex);
public void setExecutor(ThreadPoolExecutor ex);

ThreadPoolExecutor getExecutor();
public ThreadPoolExecutor getExecutor(Morphium m);
}
9 changes: 4 additions & 5 deletions src/de/caluga/morphium/query/QueryFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class QueryFactoryImpl implements QueryFactory {
private Class<? extends Query> queryImpl;
ThreadPoolExecutor executor = null;
private ThreadPoolExecutor executor = null;

public QueryFactoryImpl() {
}
Expand All @@ -30,9 +30,9 @@ public QueryFactoryImpl(Class<? extends Query> qi) {
}

@Override
public ThreadPoolExecutor getExecutor() {
public ThreadPoolExecutor getExecutor(Morphium m) {
if (executor == null) {
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
executor = new ThreadPoolExecutor(m.getConfig().getMaxConnections() / 2, (int) (m.getConfig().getMaxConnections() * m.getConfig().getBlockingThreadsMultiplier() * 0.9),
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Expand All @@ -55,8 +55,7 @@ public <T> Query<T> createQuery(Morphium m, Class<? extends T> type) {
Query<T> q = queryImpl.newInstance();
q.setMorphium(m);
q.setType(type);
q.setExecutor(getExecutor());

q.setExecutor(getExecutor(m));
return q;
} catch (InstantiationException e) {
throw new RuntimeException(e);
Expand Down
7 changes: 0 additions & 7 deletions src/de/caluga/morphium/query/QueryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* User: Stpehan Bösebeck
Expand Down Expand Up @@ -83,11 +81,6 @@ public ServerAddress getServer() {
}

public ThreadPoolExecutor getExecutor() {
if (executor == null) {
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
return executor;
}

Expand Down
11 changes: 5 additions & 6 deletions src/de/caluga/morphium/writer/MorphiumWriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,9 +33,7 @@ public class MorphiumWriterImpl implements MorphiumWriter {
private Morphium morphium;
private int maximumRetries = 10;
private int pause = 250;
private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1000, true));
private ThreadPoolExecutor executor = null;

@Override
public void setMaximumQueingTries(int n) {
Expand All @@ -51,8 +49,9 @@ public void setPauseBetweenTries(int p) {
public void setMorphium(Morphium m) {
morphium = m;
if (m != null) {
executor.setCorePoolSize(m.getConfig().getMaxConnections() / 2);
executor.setMaximumPoolSize((int) (m.getConfig().getMaxConnections() * m.getConfig().getBlockingThreadsMultiplier() * 0.9));
executor = new ThreadPoolExecutor(m.getConfig().getMaxConnections() / 2, (int) (m.getConfig().getMaxConnections() * m.getConfig().getBlockingThreadsMultiplier() * 0.9),
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/de/caluga/test/mongo/suite/CacheSyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void idCacheTest() throws Exception {
dur = System.currentTimeMillis() - start;
log.info("Storing with synchronizer: " + dur + " ms");

Thread.sleep(10000);
Thread.sleep(15000);
start = System.currentTimeMillis();
int notFoundCounter = 0;
for (int i = 0; i < 100; i++) {
Expand Down
11 changes: 11 additions & 0 deletions test/de/caluga/test/mongo/suite/WriteBufferCountTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import de.caluga.morphium.query.Query;
import org.junit.Test;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -57,4 +59,13 @@ private int waitForWriteProcessToBeScheduled() {
}
return c;
}


@Test
public void threadNumberTest() throws Exception {
ThreadMXBean thbean = ManagementFactory.getThreadMXBean();
log.info("Running threads: " + thbean.getThreadCount());
assert (thbean.getThreadCount() < 1000);

}
}

0 comments on commit e6a8c6c

Please sign in to comment.