-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[libbeat] Disk queue implementation #21176
Merged
Merged
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit
Hold shift + click to select a range
e76a41b
Initial disk queue skeleton
faec 1d8bf65
Merge branch 'feature-disk-queue' into disk-queue
faec e8c8128
Sketching out some top-level disk queue data structures
faec 97a7ed5
add queue type registration
faec f5ad9a2
Merge branch 'feature-disk-queue' into disk-queue
faec a78b85c
use new registry helper
faec b12020c
connect external user config to the queue's Settings struct
faec 67540d6
Fill out more default settings
faec 94f125c
review comments
faec f29a96f
review comments (add panic to unimplemented functions)
faec 7ce01f9
Merge branch 'feature-disk-queue' into disk-queue
faec a04980e
some state file stuff
faec 26b4248
revising code to match new design
faec f30f30b
more state file handling
faec c312d69
reading data frames from segments
faec 1a40b06
fleshing out segment logic
faec ce65718
lots of partial work on reader and writer
faec 22ae148
reworking segments
faec 3f5f8fe
reworking reader code
faec 61fa5d7
working on writer loop
faec 0191dc6
Merge branch 'master' into disk-queue-0
faec 3bf35ff
fix most build errors
faec 50bd450
checksumType -> ChecksumType
faec 04c9b60
working on read / write loops
faec 7a2e09a
replace filebeat with a queue wrapper for testing
faec 132ba8e
adapting encoder stuff from the disk spool
faec e73f55f
add most of the api logic for the reader / writer loops
faec 6d2ca31
filling in segment-deletion api
faec 988cef6
connect consumer ack endpoints
faec 7774dc4
organize, delete dead code
faec 7146525
comment / delete more old code
faec 1001565
cleanup, plug in consumer acks
faec bb56b8f
finish reader loop api / logic
faec a99e869
make things build
faec 89da2b2
clean up naming, plug in queue initialization, start fixing writer loop
faec 6fa9d33
finish writer loop rework
faec e77dfa6
plug in writing to actual files
faec d2a65dc
make diskQueue.logger a top-level field
faec b93fe83
move segment allocation to the producer handler, clean up naming
faec 3a3bfcd
properly handle writer loop responses
faec cb1107d
implement readSegmentHeader, more cleanup / naming changes
faec 3020962
move queueSettings out of queueSegment, fix bugs
faec df31c0e
delete old code, reorganize some structures
faec 4a22ccd
move frame-wrapping responsibility to the writer loop
faec d4e1dcd
add event deserialization
faec 0e12495
properly assign frame ids after reading
faec 56fe5ba
remove obsolete code
faec eaf6e2b
compute / verify real checksums
faec 5f0376d
don't read everything twice
faec d63cbf0
Fix ACKing / deletion
faec 98e61f0
revert testing wrapper around filebeat
faec e48c815
filebeat ingestion is working!
faec 1575640
clean up capacity checking / add maybeUnblockProducers
faec d495aa8
move all dq.maybe... calls out of the handle... helpers
faec dd08e2c
simplify / clarify the writer loop response logic
faec 8db0acf
add retrying to file creation / writing
faec 26d226d
correct / clarify comments
faec 442a513
move segment read / write positions to the segments struct
faec 1a1742e
remove the separate 'core loop' struct and coalesce its helpers in di…
faec 5ca039d
set output channel buffer to a more reasonable size
faec fdd4be6
Expose some user configuration settings
faec f7a446d
send producer acks
faec 79a81de
remove checksum type as a configurable field
faec f88481c
Batch producer ACKs / fsync after writes
faec 8b61f5b
remove queueSegment.header which is no longer needed
faec 8c32477
reorganizing files a bit, starting the final pass of consumer acks
faec 0a6f9f4
initialize the read position properly, continue consumer ack revision
faec 91bc3b2
still rewriting consumer acks
faec aea4cad
plug consumer acks back in
faec a8ca56a
...increment the loop variable -.-
faec 6d91ab5
cleanups
faec 767aabb
remove unused variables
faec 0bf668e
shutdown cleanly
faec 857077c
delete the final segment on shutdown if everything is ACKed
faec d4cd0bb
turn on basic producer canceling
faec 91ce56f
propagate event flags thru the queue
faec f9dbfb1
clean up state_file / remove unused code
faec 93d3fff
save queue position as events are ACKed
faec 0ac493c
remove producer cancel boilerplate (not needed in v1)
faec 3af1421
various review comments
faec 333cb76
...save everything before commit
faec 0dfffe1
clarify the errors that "can't" fail
faec 43c4aaa
be more consistent about unsignedness
faec 0b33c4b
change the format of the deleter loop response
faec 7b2f4ba
make helper functions to initialize the helper loop structures
faec a626cec
clarify comments
faec 53da09a
minor tweaks
faec 565a99f
remove unused event field in writeFrame
faec 008fbe7
...save all files before commit...
faec b5ec926
don't initialize waitgroup to a nil pointer
faec 11e55a8
error message cleanups
faec ff04bb5
simplify position write-to-disk
faec cc49879
Merge branch 'master' into disk-queue-0
faec 2baf1d4
add some unit tests
faec 30cf56c
more verbose config errors
faec c619d6a
Merge branch 'master' into disk-queue-0
faec 5b41351
change queue acks back to a pointer since it can't be used w/o initia…
faec File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package diskqueue | ||
|
||
import "testing" | ||
|
||
func TestProducerWriteRequest(t *testing.T) { | ||
dq := &diskQueue{settings: DefaultSettings()} | ||
frame := &writeFrame{ | ||
serialized: make([]byte, 100), | ||
} | ||
request := producerWriteRequest{ | ||
frame: frame, | ||
shouldBlock: true, | ||
responseChan: make(chan bool, 1), | ||
} | ||
dq.handleProducerWriteRequest(request) | ||
|
||
// The request inserts 100 bytes into an empty queue, so it should succeed. | ||
// We expect: | ||
// - the response channel should contain the value true | ||
// - the frame should be added to pendingFrames and assigned to | ||
// segment 0. | ||
success, ok := <-request.responseChan | ||
if !ok { | ||
t.Error("Expected a response from the producer write request.") | ||
} | ||
if !success { | ||
t.Error("Expected write request to succeed") | ||
} | ||
|
||
if len(dq.pendingFrames) != 1 { | ||
t.Error("Expected 1 pending frame after a write request.") | ||
} | ||
if dq.pendingFrames[0].frame != frame { | ||
t.Error("Expected pendingFrames to contain the new frame.") | ||
} | ||
if dq.pendingFrames[0].segment.id != 0 { | ||
t.Error("Expected new frame to be assigned to segment 0.") | ||
} | ||
} | ||
|
||
func TestHandleWriterLoopResponse(t *testing.T) { | ||
// Initialize the queue with two writing segments only. | ||
dq := &diskQueue{ | ||
settings: DefaultSettings(), | ||
segments: diskQueueSegments{ | ||
writing: []*queueSegment{ | ||
&queueSegment{id: 1}, | ||
&queueSegment{id: 2}, | ||
}, | ||
}, | ||
} | ||
// This response says that the writer loop wrote 200 bytes to the first | ||
// segment and 100 bytes to the second. | ||
dq.handleWriterLoopResponse(writerLoopResponse{ | ||
bytesWritten: []int64{200, 100}, | ||
}) | ||
|
||
// After the response is handled, we expect: | ||
// - Each segment's endOffset should be incremented by the bytes written | ||
// - Segment 1 should be moved to the reading list (because all but the | ||
// last segment in a writer loop response has been closed) | ||
// - Segment 2 should remain in the writing list | ||
if len(dq.segments.reading) != 1 || dq.segments.reading[0].id != 1 { | ||
t.Error("Expected segment 1 to move to the reading list") | ||
} | ||
if len(dq.segments.writing) != 1 || dq.segments.writing[0].id != 2 { | ||
t.Error("Expected segment 2 to remain in the writing list") | ||
} | ||
if dq.segments.reading[0].endOffset != 200 { | ||
t.Errorf("Expected segment 1 endOffset 200, got %d", | ||
dq.segments.reading[0].endOffset) | ||
} | ||
if dq.segments.writing[0].endOffset != 100 { | ||
t.Errorf("Expected segment 2 endOffset 100, got %d", | ||
dq.segments.writing[0].endOffset) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: describing the expectations in the comment do not provide value as the messages passed to
t.Error
functions tell the same story.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's technically redundant, but the verbal description is shorter and easier to parse. I also like having a description of what I'm about to check, because the two listed invariants are logically equivalent to the 5 conditions in the code, but a reader seeing the test for the first time only knows what the code actually tests, and not what the author thought they were testing (which isn't always the same). This way if a test fails it's easier to recognize whether the problem is in the package or the test itself (i.e. whether the test is checking the invariants incorrectly, rather than the invariants actually failing).
(The secret long-term plan is to have verbal descriptions like this for every logically distinct state change, and to coalesce them into package documentation so it's easier to see how the pieces fit together.)