Skip to content

Commit

Permalink
Wait for all chunks being purged before deleting @queued_num items
Browse files Browse the repository at this point in the history
This commit resolves fluent#1999.

Signed-off-by: abicky <[email protected]>
  • Loading branch information
abicky committed Jun 10, 2018
1 parent aa848de commit d82c156
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def initialize
@queue = [] #=> Array (chunks) : already flushed (not written)
@dequeued = {} #=> Hash (unique_id -> chunk): already written (not purged)
@queued_num = {} # metadata => int (number of queued chunks)
@dequeued_num = {} # metadata => int (number of dequeued chunks)

@stage_size = @queue_size = 0
@metadata_list = [] # keys of @stage
Expand Down Expand Up @@ -462,6 +463,8 @@ def dequeue_chunk

@dequeued[chunk.unique_id] = chunk
@queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero
@dequeued_num[chunk.metadata] ||= 0
@dequeued_num[chunk.metadata] += 1
log.trace "chunk dequeued", instance: self.object_id, metadata: chunk.metadata
chunk
end
Expand All @@ -476,6 +479,7 @@ def takeback_chunk(chunk_id)
@queue.unshift(chunk)
log.trace "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.metadata
@queued_num[chunk.metadata] += 1 # BUG if nil
@dequeued_num[chunk.metadata] -= 1
end
true
end
Expand All @@ -497,9 +501,11 @@ def purge_chunk(chunk_id)
log.error_backtrace
end

if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1)
@dequeued_num[chunk.metadata] -= 1
if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero?
@metadata_list.delete(metadata)
@queued_num.delete(metadata)
@dequeued_num.delete(metadata)
end
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end
Expand Down

0 comments on commit d82c156

Please sign in to comment.