-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removing Integer.MAX column size limit. #3743
Conversation
@@ -82,6 +132,11 @@ public void write(T objectToWrite) throws IOException | |||
valuesOut.write(bytesToWrite); | |||
|
|||
headerOut.write(Ints.toByteArray((int) valuesOut.getCount())); | |||
headerOutLong.write(Longs.toByteArray(valuesOut.getCount())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid writing headerOutLong file in cases where its not needed? may be, we can write headerOutLong on demand at the point when fileSizeLimit is crossed (and stop writing headerOut afterwards)? in most use cases, headerOutLong would not need to be written at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that I need to build headerOutLong from headerOut values during switch, I decided to keep the same info in headerOutLong to avoid building from headerOut and delete it if not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I can totally avoid writing into headerOut after the switch. Will update the pr not to write into headerOut after switch.
{ | ||
headerOut.close(); | ||
headerOutLong.close(); | ||
valuesOut.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these 3 lines are common in closeXXX() and can be moved to close()
headerOutLong.close(); | ||
valuesOut.close(); | ||
|
||
// revisit this check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it.
} | ||
finally { | ||
metaOut.close(); | ||
throw new ISE( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
say a druid user got this error during batch/realtime ingestion , it will continue to happen on retries ... what would be the resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen if column values are greater than 2^31 bytes and each value split has only one element. but yes for this kind of extreme cases its the design limitation and retries will continue to throw this exception.
); | ||
} | ||
throw new IAE("Unknown version[%s]", versionFromBuffer); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't this just be read(buffer,strategy,null)
with possible error msg updated in other read if necessary.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other read(with smoosher) will again read version from the buffer, in that case it will error out.
5c2b3b8
to
bd90ff3
Compare
@himanshug Updated the PR to create headerLong only after hitting the limit, also changed the version number to use |
👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before going on with the review I want to ask you (also @himanshug and @cheddar) - so the reason to introduce this complexity is 2 GB limitation of ByteBuffers, what about overcoming this limitation by replacing ByteBuffers with Memory
, that is beneficial for other reasons also? #3716 (comment)
@@ -58,18 +59,23 @@ | |||
this.valueSupplier = valueSupplier; | |||
} | |||
|
|||
public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) | |||
public static CompressedVSizeIndexedV3Supplier fromByteBuffer( | |||
ByteBuffer buffer, ByteOrder order, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each param should be on a separate line
@@ -52,7 +51,7 @@ public static int skip(IntIterator it, int n) | |||
* It isn't checked if the given source iterators are actually ascending, if they are not, the order of values in the | |||
* returned iterator is undefined. | |||
* <p> | |||
* This is similar to what {@link MergeIterator} does with simple {@link java.util.Iterator}s. | |||
* This is similar to what {@link io.druid.java.util.common.guava.MergeIterator} does with simple {@link java.util.Iterator}s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns
@@ -68,7 +67,7 @@ public static IntIterator mergeAscending(List<IntIterator> iterators) | |||
} | |||
|
|||
/** | |||
* This class is designed mostly after {@link MergeIterator}. {@code MergeIterator} uses a priority queue of wrapper | |||
* This class is designed mostly after {@link io.druid.java.util.common.guava.MergeIterator}. {@code MergeIterator} uses a priority queue of wrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns
@@ -36,12 +37,13 @@ | |||
|
|||
public BlockLayoutIndexedFloatSupplier( | |||
int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each param should be on a separate line
{ | ||
byte versionFromBuffer = buffer.get(); | ||
|
||
if (versionFromBuffer == VERSION) { | ||
final int totalSize = buffer.getInt(); | ||
final int sizePer = buffer.getInt(); | ||
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); | ||
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId( | ||
buffer.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
);
should be on a separate line
public byte[] toBytes(String val) | ||
{ | ||
if (val == null) { | ||
return new byte[]{}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteArrays.EMPTY_ARRAY
if (val == null) { | ||
return new byte[]{}; | ||
} | ||
return io.druid.java.util.common.StringUtils.toUtf8(val); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why fully qualified?
return ordering.compare(o1, o2); | ||
} | ||
}; | ||
private static final byte version_one = 0x1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to keep versions on the very top to of the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either CAP_CASE or camelCase
} | ||
}; | ||
private static final byte version_one = 0x1; | ||
private static final byte version_two = 0X2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small x
this.theBuffer = buffer; | ||
this.strategy = strategy; | ||
this.allowReverseLookup = allowReverseLookup; | ||
size = theBuffer.getInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested numElements
, to match format description in the header. The same is for other fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indexed interface exposes int size();
. Using variable name size
in parity with this function makes more sense to me.
Also some of the exceptions thrown from this class are also using size in the description.
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
5ab8861
to
d43c39e
Compare
@leventov Thanks for reviewing this PR. I updated the PR for code formatting related changes. |
@leventov yes, it is to introduce larger columns with existing ByteBuffer based GenericIndexed. Replacing ByteBuffer with Memory would achieve same but that would take some time before getting materialized. @cheddar any other thoughts? |
@akashdw @himanshug If we add a more complex format now, we have to support it for very long or forever, despite this complexity could be avoided if we replace |
@leventov I understand replacing ByteBuffer with memory would achieve the same but memory changes are not ready yet. @niketh and @leerho are working on the memory changes and might take some time to replace ByteBuffer with memory. I think these changes are completely independent of memory changes and allow us to create large columns using ByteBuffer. As I mentioned earlier memory changes can be introduced later with a new GenericIndexed version |
@niketh thanks for letting know. @akashdw IMO being able to go beyond 2 GB limit one or two public Druid releases earlier doesn't worth introducing complexity, that should then be supported for years. At Yahoo you need it and you are the author of the patch, so you can apply it and use private builds (if you don't do this already) |
@leventov I'm not sure I understand the complexity here. There is an explicit need to support large columns, we have users who are actually running into this problem and you are asking us to not merge it because of a PR that isn't even ready yet? We chose to do these even though we knew that the I don't think it's a good idea to set a precedent of not accepting patches simply because you, personally, don't need the fix. And the argument about support can be used to say that we should not accept any PR whatsoever. This change is incremental, opt-in only and can clearly and cleanly be migrated forward. I don't think there is any reason to block the PR. |
@leventov if you are volunteering to complete the move to the
|
@cheddar the fact that other people need this, besides Yahoo, changes the picture. I'm not blocking it, if any other committer not from Yahoo is OK with it. |
@leventov even if it is only Yahoo at this point in time, I don't see why that's a reason to block it. Btw, as I've calmed down a little bit. Let me add that this PR introduces an interface change to In general, Druid has and continues to maintain versioned binary storage to enable us to iterate and make changes as needed. This has served us well and is the mechanism by which we handle complexity and support for features that might initially seem orthogonal to what others also need out of the project. But, in the end, we are running into this problem because we are pushing the limits of Druid and need to resolve those issues. The changes are all within the normal mechanisms that Druid uses to make incremental changes and they are purely opt-in. If there is some technical reason that you think we are making changes to the code that will have long-lasting, negative implications, please bring them up and we can try to address them. But if the push back is merely "it's more code and more code increases the maintenance burden," I do not believe that argument is valid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopped on GenericIndexedWriter.java
* Meta File: | ||
* byte 1: version (0x2) | ||
* byte 2 == 0x1 =>; allowReverseLookup | ||
* bytes 3-6: numberOfElementsPerValueFile expressed as power of 2. That means all the value files contains same number of items |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 cols
@Override | ||
public T get(int index) | ||
{ | ||
if (index < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 143-148 replaceable with Preconditions.checkElementIndex()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't be able throw exception as IAE, My intent here is throw IAE exception with custom error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message of IndexOutOfBoundsException
produced by Preconditions
is just as informative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of primary thing I have considered while making these changes is not to change existing behavior of V1 GenericIndexed. Current V1 in master is throwing IAE with custom error message and I'm keeping the same behavior. V2 just follows the same convention as V1. May be there can be another PR to replace IAE with preconditions checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then could you please extract this block (repeated several times in GenericIndexed
) as a helper method in GenericIndexed
and leave a comment on this method, explaining why this method is used instead of Preconditions?
numElementsPerValueFile = size; | ||
|
||
int indexOffset = theBuffer.position(); | ||
int valuesOffset = theBuffer.position() + (size << 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested * Ints.BYTES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't <<2 more optimal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, Hotspot JIT (if not javac) replaces multiplication or division by a static compile-time constant, which is a power of 2, with shift.
Even if it wasn't so, this constructor is not a hot place where every CPU cycle counts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Ints.BYTES is more readable, will change the code to use it.
throw new IAE(String.format("Index[%s] >= size[%s]", index, size)); | ||
} | ||
|
||
ByteBuffer copyHeaderBuffer = headerBuffer.asReadOnlyBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested not to create this copy and use a few positioned getInt()
calls below on the original headerBuffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the java doc I don't see any copying of data while creating ReadOnlyBuffer, it just ensures write protection on original buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I suggest to remove line 150 and then
if (index == 0) {
startOffset = 4;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + 4;
endOffset = headerBuffer.getInt(headerPosition + 4);
}
return _get(valueBuffer.asReadOnlyBuffer(), startOffset, endOffset);
2 lines less in total, less garbage created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how will you ensure BufferedIndexed.reads are write protected? BufferedIndexed contract is to fetch element and guarantees no modification in original ByteBuffer. Any bad implementation of an ObjectStrategy can corrupt the original ByteBuffer. I think protecting ByteBuffer write has more benefits than less garbage collection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh my mistake. sure, suggested code ensures write protection. I'll update the PR
final ByteBuffer copyBuffer = valueBuffers.get(fileNum).asReadOnlyBuffer(); | ||
final ByteBuffer copyHeaderBuffer = headerBuffer.asReadOnlyBuffer(); | ||
|
||
if (index < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaceable with Preconditions.checkElementIndex()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't be able throw exception as IAE, My intent here is throw IAE exception with custom error message.
int fileNum = index / numElementsPerValueFile; | ||
final ByteBuffer copyBuffer = copyValueBuffers.get(fileNum); | ||
|
||
if (index < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't be able throw exception as IAE, My intent here is throw IAE exception with custom error message.
final int startOffset; | ||
final int endOffset; | ||
|
||
if (index % numElementsPerValueFile == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is identical to some block in BufferedIndexed
above, consider extracting a method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept both the blocks separately because version 1 does a check of if (index == 0) vs V2 check is if (index % numElementsPerValueFile == 0), we wanted to avoid % operation for version 1 indexed.
} | ||
}; | ||
} else { | ||
final List<ByteBuffer> copyValueBuffers = Lists.newArrayList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating an empty array list and filling it in a for-each loop over valueBuffers
takes 4 lines, this block takes 13 lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
throws IOException | ||
{ | ||
while (numBytesToPutInFile > 0) { | ||
int bytesRead = is.read(buffer, 0, Math.min(buffer.length, (int) numBytesToPutInFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe cast to int, may lead to exception, suggested Ints.saturatedCast(numBytesToPutInFile)
{ | ||
while (numBytesToPutInFile > 0) { | ||
int bytesRead = is.read(buffer, 0, Math.min(buffer.length, (int) numBytesToPutInFile)); | ||
smooshChannel.write((ByteBuffer) ByteBuffer.wrap(buffer).limit(bytesRead)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be checked bytesRead
is not -1
f1b4689
to
b13ea5d
Compare
@leventov I don't think we are going to make the change to introduce Is that acceptable or are you actively veto'ing this PR? |
@cheddar @akashdw could you please go through my arguments pro separation:
|
We will likely need to reimplement for the changes for
This is not a significant storage cost (Parag had a PR out to resolve that and introduce a new version of Generic Indexed that sat without anyone looking at it for months). Introducing that change would also actually have performance implications on indexing, because we currently do not re-process the payloads when we notice that the column needs to become large. In order to switch to that implementation, we would need to start reprocessing then. Once we are in the
That's exactly why we version our binary formats. In the future we will introduce another version and move things towards that.
The code path that I'm talking about is the main contract of the If your issues are just generally with how The point of this PR is to introduce the ability to handle columns larger than 2GB in as minimally invasive of a way as possible. In order to do that we also ended up needing to expand the abstraction for |
Ok, I agree not to change LargeGenericIndexed format (not removing the size field and changing ordering)
This abstraction, On the normal
What I currently displeased with is that Your argument against separation is that it's going to be "large C&P of the code", but I actually don't see any code that should be copied. |
Or e. g. |
@leventov if I can repeat what I think you are saying. You are annoyed that
and
If we can consolidate the fields into just one of those two sets for both uses, is that a reasonable compromise? Separating it out into separate classes seems like all it is doing is shuffling code around and I'm not sure it's really enabling or solving something other than aligning with a specific aesthetic. While it might be nicer to look at each of those pieces individually, it will also mean that the currently relatively self-contained |
@cheddar I'm annoyed not so much with the fields, but with the fact that the logic of the cases is interspersed. I'm ok with a compromise that the logic just inside I'll take another (hopefully final) look at the code in this PR later this week |
@leventov can we finish this off? This is one of the largest PRs blocking for the release |
@fjy will do before the end of this week |
@@ -81,7 +154,17 @@ public void write(T objectToWrite) throws IOException | |||
valuesOut.write(Ints.toByteArray(bytesToWrite.length)); | |||
valuesOut.write(bytesToWrite); | |||
|
|||
headerOut.write(Ints.toByteArray((int) valuesOut.getCount())); | |||
if (!requireMultipleFiles) { | |||
writeIntValueToOutputStream(buf, (int) valuesOut.getCount(), headerOut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkedCast()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why github is not showing this line as outdated
.
19b69f3#diff-37ba90c2e6d6b833ce38fc1ce76eecc3R158
// to current offset. | ||
if ((pos & (numberOfElementsPerValueFile - 1)) == 0) { | ||
relativeRefBytes = currentNumBytes; | ||
numberOfElementsPerValueFile = 1 << bagSizePower; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should be removed
} | ||
currentNumBytes = Long.reverseBytes(headerFile.readLong()); | ||
relativeNumBytes = currentNumBytes - relativeRefBytes; | ||
helperBuffer.putInt(0, Ints.checkedCast(relativeNumBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the next line are replaceable with
writeIntValueToOutputStream(helperBuffer, Ints.checkedCast(relativeNumBytes), finalHeaderOut);
This PR addresses #3513.
Note: Introduced genericIndexed version 3 instead of 2 because I see an open PR(#3069) by @pjain1 to introduce version 2.