Skip to content

Commit

Permalink
feat: add support for custom lattice prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Smith <[email protected]>
  • Loading branch information
connorsmith256 committed Aug 10, 2023
1 parent 77d663d commit c9fecb9
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 128 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ status = "actively-developed"

[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["color", "derive", "error-context", "help", "std", "suggestions", "usage"] }
clap = { workspace = true, features = ["color", "derive", "env", "error-context", "help", "std", "suggestions", "usage"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tracing = { workspace = true, features = ["release_max_level_info"] }
tracing-subscriber = { workspace = true, features = ["ansi", "env-filter", "fmt", "json", "std"] }
Expand Down
14 changes: 9 additions & 5 deletions crates/host/src/wasmbus/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use url::Url;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
/// wasmCloud Host configuration
pub struct Host {
/// URL to connect to
pub url: Url,
/// Optional host seed
/// NATS URL to connect to for control interface connection
pub ctl_nats_url: Url,
/// The lattice the host belongs to
pub lattice_prefix: String,
/// The seed key (a printable 256-bit Ed25519 private key) used by this host to generate its public key
pub host_seed: Option<String>,
/// Optional cluster seed
/// The seed key (a printable 256-bit Ed25519 private key) used by this host to sign all invocations
pub cluster_seed: Option<String>,
}

impl Default for Host {
fn default() -> Self {
Self {
url: Url::parse("nats://localhost:4222").expect("failed to parse URL"),
ctl_nats_url: Url::parse("nats://localhost:4222")
.expect("failed to parse control NATS URL"),
lattice_prefix: "default".to_string(),
host_seed: None,
cluster_seed: None,
}
Expand Down
183 changes: 90 additions & 93 deletions crates/host/src/wasmbus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,19 @@ impl Queue {
#[instrument]
async fn new(
nats: &async_nats::Client,
lattice_prefix: &str,
cluster_key: &KeyPair,
host_key: &KeyPair,
) -> anyhow::Result<Self> {
let host_id = host_key.public_key();
let (registries, pings, links, queries, auction, commands, inventory) = try_join!(
nats.subscribe("wasmbus.ctl.default.registries.put".into()),
nats.subscribe("wasmbus.ctl.default.ping.hosts".into()),
nats.subscribe("wasmbus.ctl.default.linkdefs.*".into()),
nats.subscribe("wasmbus.ctl.default.get.*".into()),
nats.subscribe("wasmbus.ctl.default.auction.>".into()),
nats.subscribe(format!(
"wasmbus.ctl.default.cmd.{}.*",
host_key.public_key()
)),
nats.subscribe(format!(
"wasmbus.ctl.default.get.{}.inv",
host_key.public_key()
)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.registries.put",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.ping.hosts",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.linkdefs.*",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.get.*",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.auction.>",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.cmd.{host_id}.*",)),
nats.subscribe(format!("wasmbus.ctl.{lattice_prefix}.get.{host_id}.inv",)),
)
.context("failed to subscribe to queues")?;
Ok(Self {
Expand Down Expand Up @@ -313,6 +309,7 @@ struct InvocationResponse {
#[derive(Clone, Debug)]
struct Handler {
nats: async_nats::Client,
lattice_prefix: String,
cluster_key: Arc<KeyPair>,
origin: WasmCloudEntity,
interfaces: Arc<RwLock<HashMap<String, WasmCloudEntity>>>,
Expand Down Expand Up @@ -350,13 +347,12 @@ impl Handler {
let invocation = self.invocation(operation, request).await?;
let request =
rmp_serde::to_vec_named(&invocation).context("failed to encode invocation")?;
let lattice_prefix = &self.lattice_prefix;
let provider_id = invocation.target.public_key;
let res = self
.nats
.request(
format!(
"wasmbus.rpc.default.{}.default",
invocation.target.public_key
),
format!("wasmbus.rpc.{lattice_prefix}.{provider_id}.default",),
request.into(),
)
.await
Expand Down Expand Up @@ -416,6 +412,7 @@ impl Bus for Handler {
let (res_r, mut res_w) = socket_pair()?;

let nats = self.nats.clone();
let lattice_prefix = self.lattice_prefix.clone();
let provider_id = target.public_key.clone();
let origin = self.origin.clone();
let target = target.clone();
Expand All @@ -438,7 +435,7 @@ impl Bus for Handler {
.map_err(|e| e.to_string())?;
let res = nats
.request(
format!("wasmbus.rpc.default.{provider_id}.default"),
format!("wasmbus.rpc.{lattice_prefix}.{provider_id}.default"),
request.into(),
)
.await
Expand Down Expand Up @@ -838,6 +835,7 @@ pub struct Host {
event_builder: EventBuilderV10,
friendly_name: String,
heartbeat: AbortHandle,
host_config: HostConfig,
host_key: KeyPair,
labels: HashMap<String, String>,
nats: async_nats::Client,
Expand Down Expand Up @@ -904,24 +902,20 @@ impl Host {
/// Construct a new [Host] returning a tuple of its [Arc] and an async shutdown function.
#[instrument]
pub async fn new(
HostConfig {
url,
cluster_seed,
host_seed,
}: HostConfig,
config: HostConfig,
) -> anyhow::Result<(Arc<Self>, impl Future<Output = anyhow::Result<()>>)> {
let cluster_key = if let Some(cluster_seed) = cluster_seed {
let kp = KeyPair::from_seed(&cluster_seed)
let cluster_key = if let Some(cluster_seed) = config.cluster_seed.as_ref() {
let kp = KeyPair::from_seed(cluster_seed)
.context("failed to construct key pair from seed")?;
ensure!(kp.key_pair_type() == KeyPairType::Cluster);
kp
} else {
KeyPair::new(KeyPairType::Cluster)
};
let cluster_key = Arc::new(cluster_key);
let host_key = if let Some(host_seed) = host_seed {
let host_key = if let Some(host_seed) = config.host_seed.as_ref() {
let kp =
KeyPair::from_seed(&host_seed).context("failed to construct key pair from seed")?;
KeyPair::from_seed(host_seed).context("failed to construct key pair from seed")?;
ensure!(kp.key_pair_type() == KeyPairType::Server);
kp
} else {
Expand All @@ -948,11 +942,15 @@ impl Host {
"version": env!("CARGO_PKG_VERSION"),
});

let nats = async_nats::connect(url.as_str())
debug!(
ctl_nats_url = config.ctl_nats_url.as_str(),
"connecting to NATS control server"
);
let nats = async_nats::connect(config.ctl_nats_url.as_str())
.await
.context("failed to connect to NATS")?;
.context("failed to connect to NATS control server")?;

let queue = Queue::new(&nats, &cluster_key, &host_key)
let queue = Queue::new(&nats, &config.lattice_prefix, &cluster_key, &host_key)
.await
.context("failed to initialize queue")?;
nats.flush().await.context("failed to flush")?;
Expand All @@ -977,8 +975,7 @@ impl Host {
let event_builder = EventBuilderV10::new().source(host_key.public_key());

let jetstream = async_nats::jetstream::new(nats.clone());
// TODO: Use prefix
let bucket = format!("LATTICEDATA_{prefix}", prefix = "default");
let bucket = format!("LATTICEDATA_{}", config.lattice_prefix);
create_lattice_metadata_bucket(&jetstream, &bucket).await?;

let data = jetstream
Expand All @@ -996,6 +993,7 @@ impl Host {
event_builder,
friendly_name,
heartbeat: heartbeat_abort.clone(),
host_config: config,
host_key,
labels,
nats,
Expand Down Expand Up @@ -1167,7 +1165,10 @@ impl Host {
.context("failed to build cloud event")?;
let ev = serde_json::to_vec(&ev).context("failed to serialize event")?;
self.nats
.publish("wasmbus.evt.default".into(), ev.into())
.publish(
format!("wasmbus.evt.{}", self.host_config.lattice_prefix),
ev.into(),
)
.await
.with_context(|| format!("failed to publish `{name}` event"))
}
Expand All @@ -1188,56 +1189,57 @@ impl Host {
trace!(actor_ref = actor_ref.as_ref(), count, "instantiating actor");

let actor_ref = actor_ref.as_ref();
let instances = stream::repeat(format!("wasmbus.rpc.default.{}", claims.subject))
.take(count.into())
.then(|topic| {
let pool = pool.clone();
let handler = handler.clone();
async move {
let calls = self
.nats
.queue_subscribe(topic.clone(), topic)
.await
.context("failed to subscribe to actor call queue")?;

let (calls_abort, calls_abort_reg) = AbortHandle::new_pair();
let id = Ulid::new();
let instance = Arc::new(ActorInstance {
nats: self.nats.clone(),
pool,
id,
calls: calls_abort,
runtime: self.runtime.clone(),
handler: handler.clone(),
});

let _calls = spawn({
let instances = stream::repeat(format!(
"wasmbus.rpc.{lattice_prefix}.{subject}",
lattice_prefix = self.host_config.lattice_prefix,
subject = claims.subject
))
.take(count.into())
.then(|topic| {
let pool = pool.clone();
let handler = handler.clone();
async move {
let calls = self
.nats
.queue_subscribe(topic.clone(), topic)
.await
.context("failed to subscribe to actor call queue")?;

let (calls_abort, calls_abort_reg) = AbortHandle::new_pair();
let id = Ulid::new();
let instance = Arc::new(ActorInstance {
nats: self.nats.clone(),
pool,
id,
calls: calls_abort,
runtime: self.runtime.clone(),
handler: handler.clone(),
});

let _calls = spawn({
let instance = Arc::clone(&instance);
Abortable::new(calls, calls_abort_reg).for_each_concurrent(None, move |msg| {
let instance = Arc::clone(&instance);
Abortable::new(calls, calls_abort_reg).for_each_concurrent(
None,
move |msg| {
let instance = Arc::clone(&instance);
async move { instance.handle_message(msg).await }
},
)
});
async move { instance.handle_message(msg).await }
})
});

self.publish_event(
"actor_started",
event::actor_started(
claims,
annotations,
Uuid::from_u128(id.into()),
actor_ref,
),
)
.await?;
anyhow::Result::<_>::Ok(instance)
}
})
.try_collect()
.await
.context("failed to instantiate actor")?;
self.publish_event(
"actor_started",
event::actor_started(
claims,
annotations,
Uuid::from_u128(id.into()),
actor_ref,
),
)
.await?;
anyhow::Result::<_>::Ok(instance)
}
})
.try_collect()
.await
.context("failed to instantiate actor")?;
self.publish_event(
"actors_started",
event::actors_started(claims, annotations, host_id, count, actor_ref),
Expand Down Expand Up @@ -1324,6 +1326,7 @@ impl Host {
};
let handler = Handler {
nats: self.nats.clone(),
lattice_prefix: self.host_config.lattice_prefix.clone(),
origin,
cluster_key: Arc::clone(&self.cluster_key),
interfaces: Arc::new(RwLock::new(interfaces)),
Expand Down Expand Up @@ -1833,7 +1836,7 @@ impl Host {
.collect();
let data = serde_json::to_vec(&json!({
"host_id": self.host_key.public_key(),
"lattice_rpc_prefix": "default", // TODO: Support lattice prefix config
"lattice_rpc_prefix": self.host_config.lattice_prefix,
"link_name": link_name,
"lattice_rpc_user_jwt": "", // TODO: Support config
"lattice_rpc_user_seed": "", // TODO: Support config
Expand Down Expand Up @@ -2189,7 +2192,7 @@ impl Host {
"ctl_host": "TODO",
"prov_rpc_host": "TODO",
"rpc_host": "TODO",
"lattice_prefix": "default",
"lattice_prefix": self.host_config.lattice_prefix,
}))
.context("failed to encode reply")?;
Ok(buf.into())
Expand Down Expand Up @@ -2340,13 +2343,10 @@ impl Host {
.await?;

let msgp = rmp_serde::to_vec(ld).context("failed to encode link definition")?;
let lattice_prefix = &self.host_config.lattice_prefix;
self.nats
.publish(
// TODO: Set prefix
format!(
"wasmbus.rpc.{prefix}.{provider_id}.{link_name}.linkdefs.put",
prefix = "default"
),
format!("wasmbus.rpc.{lattice_prefix}.{provider_id}.{link_name}.linkdefs.put",),
msgp.into(),
)
.await
Expand Down Expand Up @@ -2388,13 +2388,10 @@ impl Host {
// TODO: Broadcast `linkdef_removed`

let msgp = rmp_serde::to_vec(ld).context("failed to encode link definition")?;
let lattice_prefix = &self.host_config.lattice_prefix;
self.nats
.publish(
// TODO: Set prefix
format!(
"wasmbus.rpc.{prefix}.{provider_id}.{link_name}.linkdefs.del",
prefix = "default"
),
format!("wasmbus.rpc.{lattice_prefix}.{provider_id}.{link_name}.linkdefs.del",),
msgp.into(),
)
.await
Expand Down
Loading

0 comments on commit c9fecb9

Please sign in to comment.