Skip to content

Commit

Permalink
fix: fix kafka wal logs deletion and recovery logic (#1048)
Browse files Browse the repository at this point in the history
## Rationale
In the old kafka logs deletion and recovery logic, some conercases
leading to logs can never be cleaned up.

## Detailed Changes
- Fix the logs deletion logic.
- Fix recovery logic.

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored Jul 10, 2023
1 parent ed63767 commit 7c6ef04
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 109 deletions.
1 change: 1 addition & 0 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl MessageQueue for KafkaImpl {
})
}

// FIXME: consume a empty topic may be hanged forever...
async fn consume(
&self,
topic_name: &str,
Expand Down
3 changes: 1 addition & 2 deletions components/message_queue/src/tests/cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ async fn test_consume_empty_topic<T: MessageQueue>(message_queue: &T) {
.await
.is_ok());

// Call produce to push messages at first, then call consume to pull back and
// compare.
// FIXME: consume a empty topic may be hanged forever...
let mut iter = message_queue
.consume(&topic_name, StartOffset::Earliest)
.await
Expand Down
17 changes: 1 addition & 16 deletions wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common Encoding for Wal logs
Expand Down Expand Up @@ -115,17 +115,14 @@ pub enum Namespace {
}

/// Log key in old wal design, map the `TableId` to `RegionId`
#[allow(unused)]
pub type LogKey = (u64, SequenceNumber);

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogKeyEncoder {
pub version: u8,
pub namespace: Namespace,
}

#[allow(unused)]
impl LogKeyEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -202,13 +199,11 @@ impl Decoder<LogKey> for LogKeyEncoder {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogValueEncoder {
pub version: u8,
}

#[allow(unused)]
impl LogValueEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -240,12 +235,10 @@ impl<T: Payload> Encoder<T> for LogValueEncoder {
}
}

#[allow(unused)]
pub struct LogValueDecoder {
pub version: u8,
}

#[allow(unused)]
impl LogValueDecoder {
pub fn decode<'a>(&self, mut buf: &'a [u8]) -> Result<&'a [u8]> {
let version = buf.try_get_u8().context(DecodeLogValueHeader)?;
Expand Down Expand Up @@ -475,7 +468,6 @@ impl MaxSeqMetaEncoding {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogEncoding {
key_enc: LogKeyEncoder,
Expand All @@ -484,7 +476,6 @@ pub struct LogEncoding {
value_enc_version: u8,
}

#[allow(unused)]
impl LogEncoding {
pub fn newest() -> Self {
Self {
Expand Down Expand Up @@ -589,7 +580,6 @@ impl LogBatchEncoder {
}

/// Common log key used in multiple wal implementation
#[allow(unused)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
pub struct CommonLogKey {
/// Id of region which the table belongs to,
Expand All @@ -599,7 +589,6 @@ pub struct CommonLogKey {
pub sequence_num: SequenceNumber,
}

#[allow(unused)]
impl CommonLogKey {
pub fn new(region_id: u64, table_id: TableId, sequence_num: SequenceNumber) -> Self {
Self {
Expand All @@ -610,14 +599,12 @@ impl CommonLogKey {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct CommonLogKeyEncoder {
pub version: u8,
pub namespace: Namespace,
}

#[allow(unused)]
impl CommonLogKeyEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -697,7 +684,6 @@ impl Decoder<CommonLogKey> for CommonLogKeyEncoder {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct CommonLogEncoding {
key_enc: CommonLogKeyEncoder,
Expand All @@ -706,7 +692,6 @@ pub struct CommonLogEncoding {
value_enc_version: u8,
}

#[allow(unused)]
impl CommonLogEncoding {
pub fn newest() -> Self {
Self {
Expand Down
11 changes: 3 additions & 8 deletions wal/src/message_queue_impl/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Meta encoding of wal's message queue implementation
Expand Down Expand Up @@ -90,25 +90,21 @@ pub enum Error {
define_result!(Error);

/// Generate wal data topic name
#[allow(unused)]
pub fn format_wal_data_topic_name(namespace: &str, region_id: u64) -> String {
format!("{namespace}_data_{region_id}")
}

/// Generate wal meta topic name
#[allow(unused)]
pub fn format_wal_meta_topic_name(namespace: &str, region_id: u64) -> String {
format!("{namespace}_meta_{region_id}")
}

#[allow(unused)]
#[derive(Clone, Debug)]
pub struct MetaEncoding {
key_enc: MetaKeyEncoder,
value_enc: MetaValueEncoder,
}

#[allow(unused)]
impl MetaEncoding {
pub fn encode_key(&self, buf: &mut BytesMut, meta_key: &MetaKey) -> manager::Result<()> {
buf.clear();
Expand Down Expand Up @@ -153,6 +149,7 @@ impl MetaEncoding {
Ok(meta_value.into())
}

#[allow(dead_code)]
pub fn is_meta_key(&self, mut buf: &[u8]) -> manager::Result<bool> {
self.key_enc
.is_valid(&mut buf)
Expand All @@ -174,20 +171,18 @@ impl MetaEncoding {
}

/// Message queue implementation's meta key
#[allow(unused)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MetaKey(pub u64);

#[allow(unused)]
#[derive(Clone, Debug)]
pub struct MetaKeyEncoder {
pub namespace: Namespace,
pub version: u8,
}

#[allow(unused)]
impl MetaKeyEncoder {
/// Determine whether the raw bytes is a valid meta key.
#[allow(dead_code)]
pub fn is_valid<B: Buf>(&self, buf: &mut B) -> Result<bool> {
let namespace = buf.try_get_u8().context(DecodeMetaKey)?;
let version = buf.try_get_u8().context(DecodeMetaKey)?;
Expand Down
41 changes: 9 additions & 32 deletions wal/src/message_queue_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, err:{}",
namespace,
location,
source
"Failed to get sequence, namespace:{namespace}, location:{location:?}, err:{source}",
))]
GetSequence {
namespace: String,
Expand All @@ -43,10 +40,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, err:{}",
namespace,
request,
source
"Failed to read table logs, namespace:{namespace}, request:{request:?}, err:{source}",
))]
ReadWithCause {
namespace: String,
Expand All @@ -56,10 +50,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, \nBacktrace:\n{}",
namespace,
request,
backtrace,
"Failed to read table logs, namespace:{namespace}, request:{request:?}, \nBacktrace:\n{backtrace}",
))]
ReadNoCause {
namespace: String,
Expand All @@ -69,10 +60,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, err:{}",
namespace,
request,
source
"Failed to scan region logs, namespace:{namespace}, request:{request:?}, err:{source}",
))]
ScanWithCause {
namespace: String,
Expand All @@ -82,10 +70,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, \nBacktrace:\n{}",
namespace,
request,
backtrace,
"Failed to scan region logs, namespace:{namespace}, request:{request:?}, \nBacktrace:\n{backtrace}",
))]
ScanNoCause {
namespace: String,
Expand All @@ -95,11 +80,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, batch_size:{}, err:{}",
namespace,
location,
batch_size,
source
"Failed to write logs, namespace:{namespace}, location:{location:?}, batch_size:{batch_size}, err:{source}",
))]
Write {
namespace: String,
Expand All @@ -109,11 +90,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, sequence_num:{}, err:{}",
namespace,
location,
sequence_num,
source
"Failed to mark logs deleted, namespace:{namespace}, location:{location:?}, sequence_num:{sequence_num}, err:{source}",
))]
MarkDeleteTo {
namespace: String,
Expand All @@ -122,13 +99,13 @@ pub enum Error {
source: region::Error,
},

#[snafu(display("Failed to clean logs, namespace:{}, err:{}", namespace, source))]
#[snafu(display("Failed to clean logs, namespace:{namespace}, err:{source}"))]
CleanLogs {
namespace: String,
source: region::Error,
},

#[snafu(display("Failed to close namespace, namespace:{}, err:{}", namespace, source))]
#[snafu(display("Failed to close namespace, namespace:{namespace}, err:{source}"))]
Close {
namespace: String,
source: common_util::runtime::Error,
Expand Down
Loading

0 comments on commit 7c6ef04

Please sign in to comment.