Skip to content

Commit

Permalink
Cleanup inpod test server (istio#1341)
Browse files Browse the repository at this point in the history
Basically this just avoids blocking the async runtime, so we don't need
to do our own abnormal workarounds for that. Also cleans up the message
type to be more idiomatic.
  • Loading branch information
howardjohn authored Oct 21, 2024
1 parent 5739a49 commit 1a475d9
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 136 deletions.
31 changes: 14 additions & 17 deletions examples/inpodserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::os::fd::AsRawFd;
use ztunnel::test_helpers::inpod::StartZtunnelMessage;
use ztunnel::test_helpers::inpod::{start_ztunnel_server, Message};

#[cfg(target_os = "linux")]
fn main() {
use std::os::fd::AsRawFd;
use ztunnel::test_helpers::inpod::start_ztunnel_server;

#[tokio::main]
async fn main() {
let uds = std::env::var("INPOD_UDS").unwrap();
let netns = std::env::args().nth(1).unwrap();
let mut netns_base_dir = std::path::PathBuf::from("/var/run/netns");
Expand All @@ -27,19 +27,16 @@ fn main() {

let fd = netns_file.as_raw_fd();

let mut sender = start_ztunnel_server(uds);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
sender
.send(StartZtunnelMessage {
uid: "uid-0".to_string(),
workload_info: None,
fd,
})
.await
.unwrap();
sender.wait_forever().await.unwrap();
});
let mut sender = start_ztunnel_server(uds.into()).await;
sender
.send(Message::Start(StartZtunnelMessage {
uid: "uid-0".to_string(),
workload_info: None,
fd,
}))
.await
.unwrap();
sender.wait_forever().await.unwrap();
}

#[cfg(not(target_os = "linux"))]
Expand Down
120 changes: 57 additions & 63 deletions src/test_helpers/inpod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::inpod::test_helpers::{
use crate::inpod::istio::zds::WorkloadInfo;
use crate::test_helpers;
use crate::test_helpers::MpscAckSender;
use std::path::Path;
use std::path::PathBuf;
use tokio::io::AsyncReadExt;
use tracing::info;
use tracing::{debug, info, instrument};

#[derive(Debug)]
pub struct StartZtunnelMessage {
Expand All @@ -30,78 +30,72 @@ pub struct StartZtunnelMessage {
pub fd: i32,
}

pub fn start_ztunnel_server<P: AsRef<Path> + Send + 'static>(
bind_path: P,
) -> MpscAckSender<StartZtunnelMessage> {
info!("starting server {}", bind_path.as_ref().display());
#[derive(Debug)]
pub enum Message {
Start(StartZtunnelMessage),
Stop(String),
}

#[instrument]
pub async fn start_ztunnel_server(bind_path: PathBuf) -> MpscAckSender<Message> {
info!("starting server");

// remove file if exists
if bind_path.as_ref().exists() {
info!(
"removing existing server socket file {}",
bind_path.as_ref().display()
);
if bind_path.exists() {
info!("removing existing server socket file",);
std::fs::remove_file(&bind_path).expect("remove file failed");
}
let (tx, mut rx) = test_helpers::mpsc_ack::<StartZtunnelMessage>(1);
let (tx, mut rx) = test_helpers::mpsc_ack::<Message>(1);

// these tests are structured in an unusual way - async operations are done in a different thread,
// that is joined. This blocks asyncs done here. thus the need run the servers in a different thread
info!("spawning server {}", bind_path.as_ref().display());
std::thread::spawn(move || {
info!("starting server thread {}", bind_path.as_ref().display());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let listener = crate::inpod::packet::bind(bind_path.as_ref()).expect("bind failed");
info!(
"waiting for connection from ztunnel server {}",
bind_path.as_ref().display()
);
let (mut ztun_sock, _) = listener.accept().await.expect("accept failed");
info!(
"accepted connection from ztunnel server {}",
bind_path.as_ref().display()
);
info!("spawning server");
tokio::task::spawn(async move {
let listener = crate::inpod::packet::bind(&bind_path).expect("bind failed");
info!("waiting for connection from ztunnel server");
let (mut ztun_sock, _) = listener.accept().await.expect("accept failed");
info!("accepted connection from ztunnel server");

// read the hello message:
let hello = read_hello(&mut ztun_sock).await;
info!("hello received, {:?}", hello);
// read the hello message:
let hello = read_hello(&mut ztun_sock).await;
info!(?hello, "hello received");

// send snapshot done msg:
send_snap_sent(&mut ztun_sock).await;
info!(
"initial snapshot sent from ztun server {}",
bind_path.as_ref().display()
);
// send snapshot done msg:
send_snap_sent(&mut ztun_sock).await;
info!("sent initial snapshot",);

// receive ack from ztunnel
let mut buf: [u8; 100] = [0u8; 100];
let read_amount = ztun_sock.read(&mut buf).await.unwrap();
info!("ack received, len {}", read_amount);
// Now await for FDs
while let Some(StartZtunnelMessage {
uid,
fd,
workload_info,
}) = rx.recv().await
{
let orig_uid = uid.clone();
let uid = crate::inpod::WorkloadUid::new(uid);
if fd >= 0 {
// receive ack from ztunnel
let mut buf: [u8; 100] = [0u8; 100];
let read_amount = ztun_sock.read(&mut buf).await.unwrap();
info!("ack received, len {}", read_amount);
// Now await for FDs
while let Some(msg) = rx.recv().await {
let uid = match msg {
Message::Start(StartZtunnelMessage {
uid,
workload_info,
fd,
}) => {
let orig_uid = uid.clone();
debug!(uid, %fd, "sending start message");
let uid = crate::inpod::WorkloadUid::new(uid);
send_workload_added(&mut ztun_sock, uid, workload_info, fd).await;
} else {
orig_uid
}
Message::Stop(uid) => {
let orig_uid = uid.clone();
debug!(uid, "sending delete message");
let uid = crate::inpod::WorkloadUid::new(uid);
send_workload_del(&mut ztun_sock, uid).await;
};

// receive ack from ztunnel
let _ = read_msg(&mut ztun_sock).await;
info!(uid=orig_uid, %fd, "ack received");
rx.ack().await.expect("ack failed");
orig_uid
}
};
// receive ack from ztunnel
let _ = read_msg(&mut ztun_sock).await;
info!(uid, "ack received");
if rx.ack().await.is_err() {
// Server shut down
break;
}
});
}
});
tx
}
114 changes: 58 additions & 56 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct WorkloadManager {

#[derive(Debug)]
pub struct LocalZtunnel {
fd_sender: Option<MpscAckSender<inpod::StartZtunnelMessage>>,
fd_sender: Option<MpscAckSender<inpod::Message>>,
config_sender: MpscAckSender<LocalConfig>,
namespace: Namespace,
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl WorkloadManager {
let mut inpod_uds: PathBuf = "/dev/null".into();
let ztunnel_server = if self.mode == Shared {
inpod_uds = self.tmp_dir.join(node);
Some(start_ztunnel_server(inpod_uds.clone()))
Some(start_ztunnel_server(inpod_uds.clone()).await)
} else {
None
};
Expand Down Expand Up @@ -169,60 +169,66 @@ impl WorkloadManager {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
// Setup the ztunnel...
let cloned_ns = ns.clone();
ns.run_ready(move |ready| async move {
if proxy_mode == ProxyMode::Dedicated {
// not needed in "inpod" (shared proxy) mode. In shared mode we run `ztunnel-redirect-inpod.sh`
// inside the pod's netns
helpers::run_command("scripts/ztunnel-redirect.sh")?;
}
let cert_manager = identity::mock::new_secret_manager(Duration::from_secs(10));
let app = crate::app::build_with_cert(Arc::new(cfg), cert_manager.clone()).await?;
let shutdown = app.shutdown.trigger();

// inpod mode doesn't have ore need these, so just put bogus values.
let proxy_addresses = app.proxy_addresses.unwrap_or(proxy::Addresses {
inbound: "0.0.0.0:0".parse()?,
outbound: "0.0.0.0:0".parse()?,
socks5: Some("0.0.0.0:0".parse()?),
});

let ta = TestApp {
// Not actually accessible
admin_address: helpers::with_ip(app.admin_address, ip),
metrics_address: helpers::with_ip(app.metrics_address, ip),
readiness_address: helpers::with_ip(app.readiness_address, ip),
proxy_addresses: proxy::Addresses {
outbound: helpers::with_ip(proxy_addresses.outbound, ip),
inbound: helpers::with_ip(proxy_addresses.inbound, ip),
socks5: proxy_addresses.socks5.map(|i| helpers::with_ip(i, ip)),
},
tcp_dns_proxy_address: Some(helpers::with_ip(
app.tcp_dns_proxy_address.unwrap_or("0.0.0.0:0".parse()?),
ip,
)),
udp_dns_proxy_address: Some(helpers::with_ip(
app.udp_dns_proxy_address.unwrap_or("0.0.0.0:0".parse()?),
ip,
)),
cert_manager,

namespace: Some(cloned_ns),
shutdown,
};
ta.ready().await;
info!("ready");
ready.set_ready();
tx.send(ta)?;
let cloned_ns2 = ns.clone();
// run_ready will spawn a thread and block on it. Run with spawn_blocking so it doesn't block the runtime.
tokio::task::spawn_blocking(move || {
ns.run_ready(move |ready| async move {
if proxy_mode == ProxyMode::Dedicated {
// not needed in "inpod" (shared proxy) mode. In shared mode we run `ztunnel-redirect-inpod.sh`
// inside the pod's netns
helpers::run_command("scripts/ztunnel-redirect.sh")?;
}
let cert_manager = identity::mock::new_secret_manager(Duration::from_secs(10));
let app = crate::app::build_with_cert(Arc::new(cfg), cert_manager.clone()).await?;
let shutdown = app.shutdown.trigger();

// inpod mode doesn't have ore need these, so just put bogus values.
let proxy_addresses = app.proxy_addresses.unwrap_or(proxy::Addresses {
inbound: "0.0.0.0:0".parse()?,
outbound: "0.0.0.0:0".parse()?,
socks5: Some("0.0.0.0:0".parse()?),
});

let ta = TestApp {
// Not actually accessible
admin_address: helpers::with_ip(app.admin_address, ip),
metrics_address: helpers::with_ip(app.metrics_address, ip),
readiness_address: helpers::with_ip(app.readiness_address, ip),
proxy_addresses: proxy::Addresses {
outbound: helpers::with_ip(proxy_addresses.outbound, ip),
inbound: helpers::with_ip(proxy_addresses.inbound, ip),
socks5: proxy_addresses.socks5.map(|i| helpers::with_ip(i, ip)),
},
tcp_dns_proxy_address: Some(helpers::with_ip(
app.tcp_dns_proxy_address.unwrap_or("0.0.0.0:0".parse()?),
ip,
)),
udp_dns_proxy_address: Some(helpers::with_ip(
app.udp_dns_proxy_address.unwrap_or("0.0.0.0:0".parse()?),
ip,
)),
cert_manager,

namespace: Some(cloned_ns),
shutdown,
};
ta.ready().await;
info!("ready");
ready.set_ready();
tx.send(ta)?;

app.wait_termination().await
})?;
app.wait_termination().await
})
})
.await
.unwrap()?;

// Make sure our initial config is ACKed
tx_cfg.wait().await?;
let zt_info = LocalZtunnel {
fd_sender: ztunnel_server,
config_sender: tx_cfg,
namespace: ns,
namespace: cloned_ns2,
};
self.ztunnels.insert(node.to_string(), zt_info);
let ta = rx.recv()?;
Expand All @@ -249,11 +255,7 @@ impl WorkloadManager {
self.workloads = keep;
for d in drop {
if let Some(zt) = self.ztunnels.get_mut(&d.workload.node.to_string()).as_mut() {
let msg = inpod::StartZtunnelMessage {
uid: d.workload.uid.to_string(),
workload_info: None,
fd: -1, // Test server handles -1 as del
};
let msg = inpod::Message::Stop(d.workload.uid.to_string());
zt.fd_sender
.as_mut()
.unwrap()
Expand Down Expand Up @@ -542,11 +544,11 @@ impl<'a> TestWorkloadBuilder<'a> {
.netns()
.run(|_| helpers::run_command("scripts/ztunnel-redirect-inpod.sh"))??;
let fd = network_namespace.netns().file().as_raw_fd();
let msg = inpod::StartZtunnelMessage {
let msg = inpod::Message::Start(inpod::StartZtunnelMessage {
uid: uid.to_string(),
workload_info: Some(wli),
fd,
};
});
zt_info
.fd_sender
.as_mut()
Expand Down

0 comments on commit 1a475d9

Please sign in to comment.