-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ash/Selva - Added full and prefix scan support to all the stores #61
Conversation
Changed ConcurrentHashMap of InMemoryStore to ConcurrentSkipListMap to preserve ordering of keys Added tests to RocksDB and InMemory stores
import org.scalatest.{BeforeAndAfter, FlatSpec} | ||
|
||
class RocksDbStoreSpec extends FlatSpec with BeforeAndAfter { | ||
val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test" | ||
|
||
val ROCKSDB_TEST_LOCATION = "/tmp/suuchi-rocks-test/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 Instead of using this hard-coded path see if we can use the Files.createTempDirectory
. This would ensure we automatically choose a temp directory based on the platform.
before { | ||
FileUtils.deleteDirectory(new File(ROCKSDB_TEST_LOCATION)) | ||
db = new RocksDbStore(RocksDbConfiguration.apply(ROCKSDB_TEST_LOCATION)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 I thought we reverted this change - do we really need to extract this out as a var db
and use it? Else was it breaking the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it was breaking because of the LOCK.
key.length >= prefix.length && JArrays.equals(key.take(prefix.length), prefix) | ||
} | ||
} | ||
iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 See if we can inline this variable.
store.entrySet().map(kv => KV(kv.getKey.array(), kv.getValue)).iterator | ||
} | ||
|
||
private def hasPrefix(key: Array[Byte], prefix: Array[Byte]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 This looks identical to one on RocksDBStore. Can you see if you can extract this out as a reusable method inside an Object
?
Current coverage is 74.82% (diff: 81.48%)
|
Added specs to VersioneStore Extracted out ByteArrayUtils
|
||
class ByteArrayUtilsSpec extends FlatSpec { | ||
|
||
"ByteArrayUtils" should "say whether a given byte array starts with a specified byte array prefix" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 Given we've length check it would be good to add a case where prefix
is > in length than the key
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashwanthkumar I have that test already. The coverage was 100% for ByteArrayUtils.
ByteArrayUtils.hasPrefix("string".getBytes, prefix = "longerString".getBytes) should be(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 Sorry - I missed it, my bad. Good job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashwanthkumar I guess the test description wasn't self explanatory. I ll put that into a new test n push..
case _ => false | ||
} | ||
|
||
override def toString: String = new String(key) + " " + new String(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gsriram7 If we're not using this - let's remove this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I just added it to debug
@gsriram7 The changes LGTM. Let me know if you've anything more to add before we merge. |
Added a utility to check whether hash of a given key is within specified range
.build() | ||
} | ||
|
||
private def buildResponse(response: KV): ScanResponse = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please prefer to push all private
def to bottom of the class.
|
||
private def populateStore(num: Int, store: Store) = 1 to num foreach (i => store.put(i.toString.getBytes, (i * 100).toString.getBytes)) | ||
|
||
private def extractKey(response: ScanResponse) = new String(response.getKv.getKey.toByteArray).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above - please push the private
methods to the bottom below the tests.
"SuuchiScanService" should "support scan for a given token range" in { | ||
|
||
val store = new InMemoryStore | ||
lazy val service = new SuuchiScanService(store) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for marking the service as lazy
here?
rpc Scan (ScanRequest) returns (stream ScanResponse); | ||
} | ||
message ScanRequest { | ||
int32 start = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would really help to extract the start
and end
as TokenRange
. Given we had plans to use the same thing with other store methods too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TokenRange has nodes to which keys within that range will be routed. This info isn't needed for scans so just going with start and end as of now.
val hashFn = mock(classOf[Hash]) | ||
val key = "1".getBytes | ||
|
||
when(hashFn.hash(key)).thenReturn(1).thenReturn(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I was confused with this. Can we please not use multiple thenReturn
? Instead explicitly do
when(hashFn.hash(key)).thenReturn(1)
ByteArrayUtils.isHashKeyWithinRange(10, 100, key, hashFn) should be(false)
when(hashFn.hash(key)).thenReturn(100)
ByteArrayUtils.isHashKeyWithinRange(1, 10, key, hashFn) should be(false)
It helps a lot in readability with just 1 extra line no? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is explicit, I ll make the change..
val key = "1".getBytes | ||
|
||
when(hashFn.hash(key)).thenReturn(10) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit picking - Please remove this empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally left it. For projects like zeus we follow convention like
- Variable inits \n
- Mock interactions\n
- Actual action and assertion
Didn't like it?
@@ -107,4 +111,8 @@ class VersionedStore(store: Store, versionedBy: VersionedBy, numVersions: Int, c | |||
|
|||
private def removeData(key: Array[Byte], version: Long) = store.remove(dkey(key, version)) | |||
private def removeVersion(key: Array[Byte]) = store.remove(vkey(key)) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Selva - we talked about adding a new method scanVersions
(with 2 overloaded variants like scan
) that does prefix based on VersionedStore.vkey
in VersionedStore. I find that missing. Can you please add that too?
We don't have an immediate need for it but that goes with symmetry for VersionedStore given we can scan both just for versions and for records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes then we agreed that scan should blindly give away all the versions right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh it's the version store. Yes, I ll add an overloaded method.
@gsriram7 This LGTM. I'm going ahead with the merge. Please let's take further PR comments as separate PRs. |
|
||
def toVRecord(kv: KV) = VRecord(kv.key, Versions.fromBytes(kv.value)) | ||
|
||
def scanVersions(prefix: Array[Byte]): Iterator[VRecord] = store.scan(vkey(prefix)).map(toVRecord) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add the scanVersions
method without the prefix as well.
Changed ConcurrentHashMap of InMemoryStore to ConcurrentSkipListMap to preserve ordering of keys
Added tests to RocksDB and InMemory stores.
@gsriram7 Creating a PR out of the branch - so it's easy to track the work we do and review changes easily.