Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix kafka wal logs deletion and recovery logic #1048

Merged
merged 6 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl MessageQueue for KafkaImpl {
})
}

// FIXME: consume a empty topic may be hanged forever...
async fn consume(
&self,
topic_name: &str,
Expand Down
3 changes: 1 addition & 2 deletions components/message_queue/src/tests/cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ async fn test_consume_empty_topic<T: MessageQueue>(message_queue: &T) {
.await
.is_ok());

// Call produce to push messages at first, then call consume to pull back and
// compare.
// FIXME: consume a empty topic may be hanged forever...
let mut iter = message_queue
.consume(&topic_name, StartOffset::Earliest)
.await
Expand Down
17 changes: 1 addition & 16 deletions wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common Encoding for Wal logs

Expand Down Expand Up @@ -115,17 +115,14 @@ pub enum Namespace {
}

/// Log key in old wal design, map the `TableId` to `RegionId`
#[allow(unused)]
pub type LogKey = (u64, SequenceNumber);

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogKeyEncoder {
pub version: u8,
pub namespace: Namespace,
}

#[allow(unused)]
impl LogKeyEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -202,13 +199,11 @@ impl Decoder<LogKey> for LogKeyEncoder {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogValueEncoder {
pub version: u8,
}

#[allow(unused)]
impl LogValueEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -240,12 +235,10 @@ impl<T: Payload> Encoder<T> for LogValueEncoder {
}
}

#[allow(unused)]
pub struct LogValueDecoder {
pub version: u8,
}

#[allow(unused)]
impl LogValueDecoder {
pub fn decode<'a>(&self, mut buf: &'a [u8]) -> Result<&'a [u8]> {
let version = buf.try_get_u8().context(DecodeLogValueHeader)?;
Expand Down Expand Up @@ -475,7 +468,6 @@ impl MaxSeqMetaEncoding {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct LogEncoding {
key_enc: LogKeyEncoder,
Expand All @@ -484,7 +476,6 @@ pub struct LogEncoding {
value_enc_version: u8,
}

#[allow(unused)]
impl LogEncoding {
pub fn newest() -> Self {
Self {
Expand Down Expand Up @@ -589,7 +580,6 @@ impl LogBatchEncoder {
}

/// Common log key used in multiple wal implementation
#[allow(unused)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
pub struct CommonLogKey {
/// Id of region which the table belongs to,
Expand All @@ -599,7 +589,6 @@ pub struct CommonLogKey {
pub sequence_num: SequenceNumber,
}

#[allow(unused)]
impl CommonLogKey {
pub fn new(region_id: u64, table_id: TableId, sequence_num: SequenceNumber) -> Self {
Self {
Expand All @@ -610,14 +599,12 @@ impl CommonLogKey {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct CommonLogKeyEncoder {
pub version: u8,
pub namespace: Namespace,
}

#[allow(unused)]
impl CommonLogKeyEncoder {
/// Create newest version encoder.
pub fn newest() -> Self {
Expand Down Expand Up @@ -697,7 +684,6 @@ impl Decoder<CommonLogKey> for CommonLogKeyEncoder {
}
}

#[allow(unused)]
#[derive(Debug, Clone)]
pub struct CommonLogEncoding {
key_enc: CommonLogKeyEncoder,
Expand All @@ -706,7 +692,6 @@ pub struct CommonLogEncoding {
value_enc_version: u8,
}

#[allow(unused)]
impl CommonLogEncoding {
pub fn newest() -> Self {
Self {
Expand Down
11 changes: 3 additions & 8 deletions wal/src/message_queue_impl/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Meta encoding of wal's message queue implementation

Expand Down Expand Up @@ -90,25 +90,21 @@ pub enum Error {
define_result!(Error);

/// Generate wal data topic name
#[allow(unused)]
pub fn format_wal_data_topic_name(namespace: &str, region_id: u64) -> String {
format!("{namespace}_data_{region_id}")
}

/// Generate wal meta topic name
#[allow(unused)]
pub fn format_wal_meta_topic_name(namespace: &str, region_id: u64) -> String {
format!("{namespace}_meta_{region_id}")
}

#[allow(unused)]
#[derive(Clone, Debug)]
pub struct MetaEncoding {
key_enc: MetaKeyEncoder,
value_enc: MetaValueEncoder,
}

#[allow(unused)]
impl MetaEncoding {
pub fn encode_key(&self, buf: &mut BytesMut, meta_key: &MetaKey) -> manager::Result<()> {
buf.clear();
Expand Down Expand Up @@ -153,6 +149,7 @@ impl MetaEncoding {
Ok(meta_value.into())
}

#[allow(dead_code)]
pub fn is_meta_key(&self, mut buf: &[u8]) -> manager::Result<bool> {
self.key_enc
.is_valid(&mut buf)
Expand All @@ -174,20 +171,18 @@ impl MetaEncoding {
}

/// Message queue implementation's meta key
#[allow(unused)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MetaKey(pub u64);

#[allow(unused)]
#[derive(Clone, Debug)]
pub struct MetaKeyEncoder {
pub namespace: Namespace,
pub version: u8,
}

#[allow(unused)]
impl MetaKeyEncoder {
/// Determine whether the raw bytes is a valid meta key.
#[allow(dead_code)]
pub fn is_valid<B: Buf>(&self, buf: &mut B) -> Result<bool> {
let namespace = buf.try_get_u8().context(DecodeMetaKey)?;
let version = buf.try_get_u8().context(DecodeMetaKey)?;
Expand Down
41 changes: 9 additions & 32 deletions wal/src/message_queue_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, err:{}",
namespace,
location,
source
"Failed to get sequence, namespace:{namespace}, location:{location:?}, err:{source}",
))]
GetSequence {
namespace: String,
Expand All @@ -43,10 +40,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, err:{}",
namespace,
request,
source
"Failed to read table logs, namespace:{namespace}, request:{request:?}, err:{source}",
))]
ReadWithCause {
namespace: String,
Expand All @@ -56,10 +50,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, \nBacktrace:\n{}",
namespace,
request,
backtrace,
"Failed to read table logs, namespace:{namespace}, request:{request:?}, \nBacktrace:\n{backtrace}",
))]
ReadNoCause {
namespace: String,
Expand All @@ -69,10 +60,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, err:{}",
namespace,
request,
source
"Failed to scan region logs, namespace:{namespace}, request:{request:?}, err:{source}",
))]
ScanWithCause {
namespace: String,
Expand All @@ -82,10 +70,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, request:{:?}, \nBacktrace:\n{}",
namespace,
request,
backtrace,
"Failed to scan region logs, namespace:{namespace}, request:{request:?}, \nBacktrace:\n{backtrace}",
))]
ScanNoCause {
namespace: String,
Expand All @@ -95,11 +80,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, batch_size:{}, err:{}",
namespace,
location,
batch_size,
source
"Failed to write logs, namespace:{namespace}, location:{location:?}, batch_size:{batch_size}, err:{source}",
))]
Write {
namespace: String,
Expand All @@ -109,11 +90,7 @@ pub enum Error {
},

#[snafu(display(
"Failed to open region, namespace:{}, location:{:?}, sequence_num:{}, err:{}",
namespace,
location,
sequence_num,
source
"Failed to mark logs deleted, namespace:{namespace}, location:{location:?}, sequence_num:{sequence_num}, err:{source}",
))]
MarkDeleteTo {
namespace: String,
Expand All @@ -122,13 +99,13 @@ pub enum Error {
source: region::Error,
},

#[snafu(display("Failed to clean logs, namespace:{}, err:{}", namespace, source))]
#[snafu(display("Failed to clean logs, namespace:{namespace}, err:{source}"))]
CleanLogs {
namespace: String,
source: region::Error,
},

#[snafu(display("Failed to close namespace, namespace:{}, err:{}", namespace, source))]
#[snafu(display("Failed to close namespace, namespace:{namespace}, err:{source}"))]
Close {
namespace: String,
source: common_util::runtime::Error,
Expand Down
Loading