Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/publish events to rmq #2

Open
wants to merge 119 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
1e56097
Add event server
rob-looksrare Apr 25, 2024
c2443a9
Publish events to rmq
rob-looksrare Apr 25, 2024
4487376
Minor Refactor RMQ Publish Code
RareBodhi Apr 25, 2024
3f4db61
Refactor
RareBodhi Apr 26, 2024
fd77c30
Refactor
RareBodhi Apr 26, 2024
803a33c
Refactor
RareBodhi Apr 26, 2024
918171c
Refactor
RareBodhi Apr 26, 2024
bfdaec4
Merge pull request #3 from LooksRare/bodhi/adjust-rmq-push
rob-looksrare Apr 26, 2024
1956cf3
Update src/event_publisher.rs
RareBodhi Apr 26, 2024
9399ac6
Log reduce
RareBodhi Apr 26, 2024
22ef211
Dont push index
RareBodhi Apr 26, 2024
d7db01b
Consume rmq events
rob-looksrare Apr 26, 2024
48d2b7c
Add settings flag
rob-looksrare Apr 26, 2024
17ee820
Add consumer as a subcommand
rob-looksrare Apr 26, 2024
f1ce192
Clean up
rob-looksrare Apr 26, 2024
6489034
Serialize back to event
rob-looksrare Apr 26, 2024
977d3ed
Consume inscription events
rob-looksrare Apr 26, 2024
a7d19c0
Fetch data via db
rob-looksrare Apr 26, 2024
3043639
Consume events and fetch details via api
rob-looksrare Apr 26, 2024
29ff371
Persist to db
rob-looksrare Apr 26, 2024
c5fe810
Persist to db
rob-looksrare Apr 26, 2024
a8408a6
Add todo
rob-looksrare Apr 26, 2024
8492288
Persist raw ord events
rob-looksrare May 1, 2024
87f94b5
Publish when blocks are committed
rob-looksrare May 1, 2024
739aee3
Add consumer for blocks-q
rob-looksrare May 1, 2024
0879aac
Publish block range not single block
rob-looksrare May 1, 2024
b5bddc3
Move db out of ordclient for shutdown
rob-looksrare May 1, 2024
5e7eb21
Graceful shutdown
rob-looksrare May 1, 2024
8d62582
Reject message if processing fails
rob-looksrare May 1, 2024
e458be9
Insert events only if they don't exist. fmt
rob-looksrare May 1, 2024
e9540e4
Fetch events per block
rob-looksrare May 2, 2024
cf60ac7
Fetch inscription for event
rob-looksrare May 2, 2024
75480fa
Handle inscription created event
rob-looksrare May 2, 2024
4b9f219
Handle inscription transfers, fix metadata
rob-looksrare May 3, 2024
a8cce0b
Update rmq config
rob-looksrare May 3, 2024
25523b3
Merge pull request #4 from LooksRare/feat/consume-rmq-events
rob-looksrare May 3, 2024
070a98b
Rename events to event
rob-looksrare May 3, 2024
f5950d1
Update README.md
rob-looksrare May 3, 2024
915dbcf
Fix warnings
rob-looksrare May 3, 2024
a02a496
Add GH actions
rob-looksrare May 3, 2024
edbdb31
Filter out empty json
rob-looksrare May 3, 2024
db0ff20
Runner
RareBodhi May 3, 2024
bcb574b
Runner only build
RareBodhi May 3, 2024
8fc8324
Increase timeout
rob-looksrare May 3, 2024
2543b1f
Fix lint
rob-looksrare May 3, 2024
869ef89
Fix lint
rob-looksrare May 3, 2024
f33d02e
Remove unnecessary check
rob-looksrare May 3, 2024
935c78d
Fix tests
rob-looksrare May 3, 2024
beb21b5
Fix tests
rob-looksrare May 3, 2024
4759dc9
Move to new api endpoint to not clash with tests
rob-looksrare May 3, 2024
b3dd4c1
Retry any error, it sometimes gives 404 even though data exists
rob-looksrare May 3, 2024
c7ce8da
Use amqps
rob-looksrare May 3, 2024
0dd8f4c
Pass amq conn param
rob-looksrare May 3, 2024
9841d0c
ignore amq certs
rob-looksrare May 3, 2024
cadfe5b
Revert "ignore amq certs"
rob-looksrare May 4, 2024
81f9688
Test connection
rob-looksrare May 4, 2024
8a945ab
Move event publisher to new connection way
rob-looksrare May 4, 2024
bb022a8
Change how we build uri
rob-looksrare May 5, 2024
aea3839
Test native tls
rob-looksrare May 5, 2024
597061d
Add rmq connection that supports secured and unsecured conn
rob-looksrare May 6, 2024
979ba2c
Merge branch 'feat/publish-events-to-rmq' into fix-lint
rob-looksrare May 6, 2024
a5411ac
Fix lint
rob-looksrare May 6, 2024
5237a3e
Merge pull request #5 from LooksRare/fix-lint
rob-looksrare May 6, 2024
7820fa0
Stop service if issues with rmq publishing
rob-looksrare May 6, 2024
c0dba66
fix lint
rob-looksrare May 6, 2024
cff889c
Make consumer fail if rmq error
rob-looksrare May 6, 2024
86ff185
Log db url
rob-looksrare May 6, 2024
51122ca
Encode password
rob-looksrare May 6, 2024
e6a0d15
Encode password
rob-looksrare May 6, 2024
cdf4c30
Log http errors
rob-looksrare May 6, 2024
79e4594
Format
rob-looksrare May 6, 2024
5af7c12
Fix error status propagation
rob-looksrare May 6, 2024
d5f53c1
Update .github/workflows/ci.yaml
RareBodhi May 7, 2024
30f8891
CI Adjustments
RareBodhi May 7, 2024
91fb534
Sync master
RareBodhi May 7, 2024
1fe8b1a
Test CI Change
RareBodhi May 7, 2024
b6c809e
Lint fix
RareBodhi May 7, 2024
b725268
Test fix
RareBodhi May 7, 2024
43734fe
Compile time checked queries
RareBodhi May 7, 2024
50691f3
fix sql
rob-looksrare May 7, 2024
09953d4
Redundant from
RareBodhi May 7, 2024
478a48f
Redundant from
RareBodhi May 7, 2024
ffe8297
Publish one block at a time
rob-looksrare May 7, 2024
817d1c9
Refactor
rob-looksrare May 7, 2024
2d663d5
Process one block at a time
rob-looksrare May 7, 2024
a59f81f
Process one block at a time
rob-looksrare May 7, 2024
799c6dc
Refactor
rob-looksrare May 7, 2024
6676e3c
Explicit conversions
RareBodhi May 8, 2024
13b1b64
Test fix
RareBodhi May 8, 2024
4168802
Merge
RareBodhi May 8, 2024
4292baf
Fix
RareBodhi May 8, 2024
bcd5738
Refactors
RareBodhi May 8, 2024
07e280d
Merge pull request #7 from LooksRare/bodhi/refactor-ord-changes
rob-looksrare May 8, 2024
174e057
Merge pull request #9 from LooksRare/bodhi/consumer-refactors
rob-looksrare May 8, 2024
b81ddbd
Merge pull request #8 from LooksRare/fix/split-consumers
rob-looksrare May 8, 2024
9bd812a
Refactors Continued
RareBodhi May 8, 2024
c100d2c
Refactors Continued
RareBodhi May 8, 2024
19be6f7
Return if no events
rob-looksrare May 8, 2024
19b1685
Publish block only if it's passed first inscription height
rob-looksrare May 8, 2024
a89d1f0
remove db_con.rs
rob-looksrare May 8, 2024
731cbad
Fix build
rob-looksrare May 8, 2024
f21018c
Add retries for rmq publish
rob-looksrare May 8, 2024
6250178
Add retries to rmq connection, increase retry amount
rob-looksrare May 8, 2024
8ecbb3e
Requeue failed consumption
rob-looksrare May 8, 2024
82cf880
Requeue failed consumption
rob-looksrare May 8, 2024
efe34c4
Simplified event_publisher retry
rob-looksrare May 8, 2024
655b085
Refactors Continued
RareBodhi May 9, 2024
958bb25
Refactors Continued
RareBodhi May 9, 2024
08fd6f4
Merge pull request #10 from LooksRare/bodhi/refactor-cont
RareBodhi May 9, 2024
4cfaca2
Merge with refactors
RareBodhi May 9, 2024
8567670
Persist inscription on transfer if not exists
rob-looksrare May 9, 2024
d5fb398
Merge with refactors
RareBodhi May 9, 2024
2690214
Merge with refactors
RareBodhi May 9, 2024
ff2661c
Set prefetch
rob-looksrare May 9, 2024
b468fca
Lint
RareBodhi May 9, 2024
df5640a
Merge
RareBodhi May 9, 2024
600bbf9
Merge
RareBodhi May 9, 2024
e06a30b
Merge pull request #11 from LooksRare/feat/event-publisher-retries
RareBodhi May 9, 2024
7a353a0
Revert quorom queue and roll own retry
rob-looksrare May 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
944 changes: 896 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ http = "0.2.6"
humantime = "2.1.0"
hyper = { version = "0.14.24", features = ["client", "http2"] }
indicatif = "0.17.1"
lapin = "2.3.3"
lazy_static = "1.4.0"
log = "0.4.14"
mime = "0.3.16"
Expand Down
32 changes: 32 additions & 0 deletions rmqconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"queues": [
{
"name": "ord-q",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {}
}
],
"exchanges": [
{
"name": "ord-tx",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"bindings": [
{
"source": "ord-tx",
"vhost": "/",
"destination": "ord-q",
"destination_type": "queue",
"routing_key": "",
"arguments": {}
}
]
}
70 changes: 70 additions & 0 deletions src/event_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use anyhow::{Context, Result};
use lapin::options::ConfirmSelectOptions;
use lapin::{options::BasicPublishOptions, BasicProperties, Connection, ConnectionProperties};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

use crate::index::event::Event;
use crate::settings::Settings;

pub struct EventPublisher {
pub(crate) sender: mpsc::Sender<Event>,
}

impl EventPublisher {
pub fn run(settings: &Settings) -> Result<Self, anyhow::Error> {
let addr = settings
.rabbitmq_addr()
.context("rabbitmq amqp credentials and url must be defined")?;

let exchange = settings
.rabbitmq_exchange()
.context("rabbitmq exchange path must be defined")?
.to_owned();

let (tx, mut rx) = mpsc::channel::<Event>(128);

let receiver = std::thread::spawn(move || {
Runtime::new().expect("runtime is setup").block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connects to rabbitmq ok");

let channel = conn
.create_channel()
.await
.expect("creates rmq connection channel");

channel
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("enable msg confirms");

while let Some(event) = rx.recv().await {
let message = serde_json::to_vec(&event).expect("failed to serialize event");

log::info!("publishing event: {:#?}", event);

let publish = channel
.basic_publish(
&exchange,
"",
BasicPublishOptions::default(),
&message,
BasicProperties::default(),
)
.await
.expect("published rmq msg")
.await
.expect("confirms rmq msg received");

assert!(publish.is_ack());
}
})
});

receiver.join().expect("spawn blocking event rx thread");
RareBodhi marked this conversation as resolved.
Show resolved Hide resolved

Ok(EventPublisher { sender: tx })
}
}
2 changes: 1 addition & 1 deletion src/index/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;

#[derive(Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum Event {
InscriptionCreated {
block_height: u32,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ mod blocktime;
pub mod chain;
pub mod decimal;
mod deserialize_from_str;
mod event_publisher;
mod fee_rate;
pub mod index;
mod inscriptions;
Expand Down
8 changes: 8 additions & 0 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ pub struct Options {
help = "Require basic HTTP authentication with <SERVER_USERNAME>. Credentials are sent in cleartext. Consider using authentication in conjunction with HTTPS."
)]
pub(crate) server_username: Option<String>,
#[arg(long, help = "RMQ url.")]
pub(crate) rabbitmq_url: Option<String>,
#[arg(long, help = "RMQ username.")]
pub(crate) rabbitmq_username: Option<String>,
#[arg(long, help = "RMQ password.")]
pub(crate) rabbitmq_password: Option<String>,
#[arg(long, help = "RMQ exchange to publish index events.")]
pub(crate) rabbitmq_exchange: Option<String>,
#[arg(long, short, help = "Use regtest. Equivalent to `--chain regtest`.")]
pub(crate) regtest: bool,
#[arg(long, short, help = "Use signet. Equivalent to `--chain signet`.")]
Expand Down
53 changes: 52 additions & 1 deletion src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub struct Settings {
server_password: Option<String>,
server_url: Option<String>,
server_username: Option<String>,
rabbitmq_url: Option<String>,
rabbitmq_username: Option<String>,
rabbitmq_password: Option<String>,
rabbitmq_exchange: Option<String>,
}

impl Settings {
Expand Down Expand Up @@ -141,6 +145,10 @@ impl Settings {
server_password: self.server_password.or(source.server_password),
server_url: self.server_url.or(source.server_url),
server_username: self.server_username.or(source.server_username),
rabbitmq_url: self.rabbitmq_url.or(source.rabbitmq_url),
rabbitmq_username: self.rabbitmq_username.or(source.rabbitmq_username),
rabbitmq_password: self.rabbitmq_password.or(source.rabbitmq_password),
rabbitmq_exchange: self.rabbitmq_exchange.or(source.rabbitmq_exchange),
}
}

Expand Down Expand Up @@ -175,6 +183,10 @@ impl Settings {
server_password: options.server_password,
server_url: None,
server_username: options.server_username,
rabbitmq_url: options.rabbitmq_url,
rabbitmq_username: options.rabbitmq_username,
rabbitmq_password: options.rabbitmq_password,
rabbitmq_exchange: options.rabbitmq_exchange,
}
}

Expand Down Expand Up @@ -253,6 +265,10 @@ impl Settings {
server_password: get_string("SERVER_PASSWORD"),
server_url: get_string("SERVER_URL"),
server_username: get_string("SERVER_USERNAME"),
rabbitmq_url: get_string("RMQ_URL"),
rabbitmq_username: get_string("RMQ_USERNAME"),
rabbitmq_password: get_string("RMQ_PASSWORD"),
rabbitmq_exchange: get_string("RMQ_EXCHANGE"),
})
}

Expand Down Expand Up @@ -282,6 +298,10 @@ impl Settings {
server_password: None,
server_url: Some(server_url.into()),
server_username: None,
rabbitmq_url: None,
rabbitmq_username: None,
rabbitmq_password: None,
rabbitmq_exchange: None,
}
}

Expand Down Expand Up @@ -361,6 +381,10 @@ impl Settings {
server_password: self.server_password,
server_url: self.server_url,
server_username: self.server_username,
rabbitmq_url: self.rabbitmq_url,
rabbitmq_username: self.rabbitmq_username,
rabbitmq_password: self.rabbitmq_password,
rabbitmq_exchange: self.rabbitmq_exchange,
})
}

Expand Down Expand Up @@ -553,6 +577,18 @@ impl Settings {
pub(crate) fn server_url(&self) -> Option<&str> {
self.server_url.as_deref()
}

pub fn rabbitmq_exchange(&self) -> Option<&str> {
self.rabbitmq_exchange.as_deref()
}

pub fn rabbitmq_addr(&self) -> Option<String> {
let user = self.rabbitmq_username.as_ref()?;
let pass = self.rabbitmq_password.as_ref()?;
let url = self.rabbitmq_url.as_ref()?;

Some(format!("amqp://{}:{}@{}", user, pass, url))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1000,7 +1036,10 @@ mod tests {
("NO_INDEX_INSCRIPTIONS", "1"),
("SERVER_PASSWORD", "server password"),
("SERVER_URL", "server url"),
("SERVER_USERNAME", "server username"),
("RMQ_URL", "http://127.0.0.1"),
("RMQ_USERNAME", "rmq username"),
("RMQ_PASSWORD", "rmq password"),
("RMQ_EXCHANGE", "rmq exchange"),
]
.into_iter()
.map(|(key, value)| (key.into(), value.into()))
Expand Down Expand Up @@ -1044,6 +1083,10 @@ mod tests {
server_password: Some("server password".into()),
server_url: Some("server url".into()),
server_username: Some("server username".into()),
rabbitmq_url: Some("http://127.0.0.1".into()),
rabbitmq_username: Some("rmq username".into()),
rabbitmq_password: Some("rmq password".into()),
rabbitmq_exchange: Some("rmq exchange".into()),
}
);
}
Expand Down Expand Up @@ -1076,6 +1119,10 @@ mod tests {
"--no-index-inscriptions",
"--server-password=server password",
"--server-username=server username",
"--rabbitmq-url=http://127.0.0.1",
"--rabbitmq-username=rmq username",
"--rabbitmq-password=rmq password",
"--rabbitmq-exchange=rmq exchange",
])
.unwrap()
),
Expand Down Expand Up @@ -1104,6 +1151,10 @@ mod tests {
server_password: Some("server password".into()),
server_url: None,
server_username: Some("server username".into()),
rabbitmq_url: Some("http://127.0.0.1".into()),
rabbitmq_username: Some("rmq username".into()),
rabbitmq_password: Some("rmq password".into()),
rabbitmq_exchange: Some("rmq exchange".into()),
}
);
}
Expand Down
16 changes: 16 additions & 0 deletions src/subcommand.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::event_publisher::EventPublisher;

use super::*;

pub mod balances;
Expand Down Expand Up @@ -39,6 +41,8 @@ pub(crate) enum Subcommand {
Runes,
#[command(about = "Run the explorer server")]
Server(server::Server),
#[command(about = "Run the explorer server in event emit mode")]
EventServer(server::Server),
#[command(about = "Display settings")]
Settings,
#[command(about = "Display information about a block's subsidy")]
Expand Down Expand Up @@ -71,6 +75,18 @@ impl Subcommand {
LISTENERS.lock().unwrap().push(handle.clone());
server.run(settings, index, handle)
}
Self::EventServer(server) => {
log::info!("Starting event server");
let publisher = EventPublisher::run(&settings)?;
let handle = axum_server::Handle::new();
let index = Arc::new(Index::open_with_event_sender(
&settings,
Some(publisher.sender.clone()),
)?);

LISTENERS.lock().unwrap().push(handle.clone());
server.run(settings, index, handle)
}
Self::Settings => settings::run(settings),
Self::Subsidy(subsidy) => subsidy.run(),
Self::Supply => supply::run(),
Expand Down