Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to limit collection capacity #289

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.googlecode.cqengine;

import com.googlecode.cqengine.attribute.Attribute;
import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.query.option.AttributeOrder;
import com.googlecode.cqengine.query.option.QueryOptions;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static com.googlecode.cqengine.query.QueryFactory.*;

/**
* @author wayne
* @since 0.1.0
*/
public class CapacityLimitedIndexedCollection<O, A extends Comparable<A>> extends ConcurrentIndexedCollection<O> {

private static final int DEF_CAPACITY_LIMIT = 1000;
private static final int DEF_RELEASE_SIZE = 3;

/**
* The capacity size of collection
*/
private final int capacityLimit;

/**
* The size of each release capacity
*/
private final int releaseSize;

/**
* Sort field
*/
private final AttributeOrder<O> attributeOrder;

/**
* Expiration strategy: LRU,LFU,FIFO,NONE
*/
private Eviction eviction;

public CapacityLimitedIndexedCollection() {
this(DEF_CAPACITY_LIMIT);
}

public CapacityLimitedIndexedCollection(int capacityLimit) {
this(capacityLimit, DEF_RELEASE_SIZE);
}

public CapacityLimitedIndexedCollection(int capacityLimit, int releaseSize) {
this(capacityLimit, 0, Eviction.NONE, null);
}

public CapacityLimitedIndexedCollection(
int capacityLimit,
int releaseSize,
Eviction eviction,
Attribute<O, A> orderControlAttribute
) {
this.capacityLimit = capacityLimit;
this.releaseSize = releaseSize;
this.eviction = eviction;
this.attributeOrder = new AttributeOrder<>(orderControlAttribute, true);
}

public CapacityLimitedIndexedCollection(
Persistence<O, ? extends Comparable> persistence,
int capacityLimit,
int releaseSize,
Attribute<O, A> orderControlAttribute
) {
super(persistence);
this.capacityLimit = capacityLimit;
this.releaseSize = releaseSize;
this.attributeOrder = new AttributeOrder<>(orderControlAttribute, true);
}

@Override
public boolean add(O o) {
return addAll(Collections.singleton(o));
}

@Override
@SuppressWarnings("unchecked")
public boolean addAll(Collection<? extends O> c) {
if (c == null || c.isEmpty()) {
return true;
}
if (isReachedCapacityLimit(c.size())) {
if (this.eviction == Eviction.NONE) {
throw new RuntimeException("Reached maximum capacity limit: " + capacityLimit);
}
int evictCount = getEvictCount(c.size());

Class<O> clazz = (Class<O>) c.stream().findFirst().get().getClass();
doEviction(evictCount, clazz);
}
return super.addAll(c);
}

private void doEviction(int evictCount, Class<O> clazz) {
List<O> toEvictEntries = retrieve(all(clazz), getQueryOptionsByEviction()).stream().limit(evictCount).collect(Collectors.toList());

if (toEvictEntries.size() > 0) {
toEvictEntries.forEach(this::remove);
}
}

private boolean isReachedCapacityLimit() {
return size() >= capacityLimit;
}

private boolean isReachedCapacityLimit(int increment) {
return (size() + increment) > capacityLimit;
}

private int getEvictCount(int increment) {
int overSize = (size() + increment) - capacityLimit;
return overSize + releaseSize;
}

public static class OrderedEntry<O> {
private O o;
private long timestamp;

public OrderedEntry(O o) {
this(o, System.currentTimeMillis());
}

public OrderedEntry(O o, long timestamp) {
this.o = o;
this.timestamp = timestamp;
}

public O getO() {
return o;
}

public void setO(O o) {
this.o = o;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}

public QueryOptions getQueryOptionsByEviction() {
if (this.eviction == Eviction.NONE) {
return null;
}
return queryOptions(orderBy(attributeOrder));
}

/**
* Expiration strategy
*/
public enum Eviction {
LRU,
LFU,
FIFO,
NONE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package com.googlecode.cqengine;

import com.google.common.collect.testing.TestStringSetGenerator;
import com.googlecode.cqengine.attribute.Attribute;
import com.googlecode.cqengine.attribute.SimpleAttribute;
import com.googlecode.cqengine.index.hash.HashIndex;
import com.googlecode.cqengine.persistence.offheap.OffHeapPersistence;
import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.QueryFactory;
import com.googlecode.cqengine.query.option.QueryOptions;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.junit.Assert;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;

import static com.googlecode.cqengine.query.QueryFactory.equal;
import static java.util.Arrays.asList;

/**
* @author wayne
* @since 0.1.0
*/
public class CapacityLimitedIndexedCollectionTest extends TestCase {

public static junit.framework.Test suite() {
TestSuite suite = new TestSuite();
suite.addTestSuite(CapacityLimitedIndexedCollectionTest.class);
return suite;
}

private static TestStringSetGenerator onHeapIndexedCollectionGenerator() {
return new TestStringSetGenerator() {
@Override protected Set<String> create(String[] elements) {
IndexedCollection<String> indexedCollection = new ConcurrentIndexedCollection<>(OnHeapPersistence.onPrimaryKey(QueryFactory.selfAttribute(String.class)));
indexedCollection.addAll(asList(elements));
return indexedCollection;
}
};
}

private static TestStringSetGenerator offHeapIndexedCollectionGenerator() {
return new TestStringSetGenerator() {
@Override protected Set<String> create(String[] elements) {
IndexedCollection<String> indexedCollection = new ConcurrentIndexedCollection<>(OffHeapPersistence.onPrimaryKey(QueryFactory.selfAttribute(String.class)));
indexedCollection.addAll(asList(elements));
return indexedCollection;
}
};
}

public void testAdd() {
IndexedCollection<TestEntry> coll = new CapacityLimitedIndexedCollection<>(5);

coll.addIndex(HashIndex.onAttribute(TestEntry.NAME));

for (int i = 0; i < 5; i++) {
coll.add(new TestEntry(String.valueOf(i), (long) i));
}

Query<TestEntry> q = equal(TestEntry.NAME, "3");
coll.retrieve(q).forEach(System.out::println);

Exception ex = Assert.assertThrows(RuntimeException.class, () -> coll.add(new TestEntry("5", 5L)));
Assert.assertTrue(ex.getMessage().startsWith("Reached maximum"));

System.out.println("Expected error occurred: " + ex.getMessage());
}

public void testAddWithFIFO() {
IndexedCollection<TestEntry> coll = new CapacityLimitedIndexedCollection<>(
5,
3,
CapacityLimitedIndexedCollection.Eviction.FIFO,
TestEntry.ORDER
);

coll.addIndex(HashIndex.onAttribute(TestEntry.NAME));

for (int i = 0; i <= 100; i++) {
coll.add(new TestEntry(String.valueOf(i), (long) i));
}

Assert.assertTrue(coll.size() <= 5);
}

public void testAddAll() {
IndexedCollection<TestEntry> coll = new CapacityLimitedIndexedCollection<>(5);

coll.addIndex(HashIndex.onAttribute(TestEntry.NAME));

for (int i = 0; i < 3; i++) {
coll.add(new TestEntry(String.valueOf(i), (long) i));
}

Query<TestEntry> q = equal(TestEntry.NAME, "2");
coll.retrieve(q).forEach(System.out::println);

Collection<TestEntry> list = new ArrayList<>();
list.add(new TestEntry("3", 3L));
list.add(new TestEntry("4", 4L));
list.add(new TestEntry("5", 5L));

Exception ex = Assert.assertThrows(RuntimeException.class, () -> coll.addAll(list));
Assert.assertTrue(ex.getMessage().startsWith("Reached maximum"));

System.out.println("Expected error occurred: " + ex.getMessage());
}

static class TestEntry {
private final String name;
private final Long order;

public TestEntry(String name, Long order) {
this.name = name;
this.order = order;
}

public String getName() {
return name;
}

public Long getOrder() {
return order;
}

// -------------------------- Attributes --------------------------
public static final Attribute<TestEntry, String> NAME = new SimpleAttribute<TestEntry, String>("name") {
public String getValue(TestEntry entry, QueryOptions queryOptions) { return entry.name; }
};

public static final Attribute<TestEntry, Long> ORDER = new SimpleAttribute<TestEntry, Long>("order") {
public Long getValue(TestEntry entry, QueryOptions queryOptions) { return entry.order; }
};

@Override
public String toString() {
return "TestEntry{" +
"name='" + name + '\'' +
'}';
}
}
}