Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the SWIM membership and gossip protocol #180

Merged
merged 1 commit into from
Jan 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 73 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ router = "*"
time = "*"
uuid = "*"
utp = "*"
rmp = "*"
rmp-serialize = "*"
rpassword = "*"
rand = "*"
threadpool = "*"

[dependencies.wonder]
path = "./vendor/wonder"
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ RUN curl -s https://static.rust-lang.org/rustup.sh | sh -s -- -y && rustc -V
RUN curl -sSL https://get.docker.io | sh && docker -v
RUN ln -snf /usr/bin/nodejs /usr/bin/node && npm install -g docco && echo "docco `docco -V`"

RUN adduser --system bldr
RUN addgroup --system bldr
RUN adduser --system bldr || true
RUN addgroup --system bldr || true

COPY ssh_wrapper.sh /usr/local/bin/ssh_wrapper.sh

Expand Down
4 changes: 2 additions & 2 deletions plans/bldr-build
Original file line number Diff line number Diff line change
Expand Up @@ -1712,8 +1712,8 @@ RUN ln -sf $(pkg_path_for chef/bldr)/bin/bldr /bin/bldr && \
ln -sf $(pkg_path_for chef/runit)/bin/sv /bin/sv && \
ln -sf $(pkg_path_for chef/runit)/bin/svlogd /bin/svlogd && \
ln -sf $(pkg_path_for chef/runit)/bin/utmpset /bin/utmpset && \
addgroup bldr && \
adduser --system --disabled-password bldr
addgroup bldr || true && \
adduser --system --disabled-password bldr || true
EOT
fi
cat <<EOT >> ./Dockerfile
Expand Down
1 change: 1 addition & 0 deletions plans/bldr/mkimage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ copy_package chef/bzip2
copy_package chef/xz
copy_package chef/libarchive
copy_package chef/runit
copy_package chef/rngd
copy_package chef/bldr

for x in $($BUSYBOX_ROOT/bin/busybox --list); do
Expand Down
12 changes: 6 additions & 6 deletions src/bldr/census.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ static LOGKEY: &'static str = "CN";
/// A CensusEntry. Manages all the data about a given member of the census.
#[derive(Debug, Clone, RustcDecodable, RustcEncodable, Eq)]
pub struct CensusEntry {
id: Uuid,
hostname: String,
ip: String,
pub id: Uuid,
pub hostname: String,
pub ip: String,
suitability: u64,
port: Option<String>,
exposes: Option<Vec<String>>,
Expand Down Expand Up @@ -248,7 +248,7 @@ impl GenServer for CensusEntryActor {
return HandleResult::Stop(StopReason::Fatal(format!("Census Entry Actor caught \
unexpected error: {:?}",
e)),
None)
None);
}
}
}
Expand Down Expand Up @@ -665,7 +665,7 @@ impl GenServer for CensusActor {
return HandleResult::Stop(StopReason::Fatal(format!("Census Actor caught \
unexpected error: {:?}",
e)),
None)
None);
}
}
}
Expand All @@ -692,7 +692,7 @@ impl GenServer for CensusActor {
return HandleResult::Stop(StopReason::Fatal(format!("Census Actor caught \
unexpected error: {:?}",
e)),
Some(CensusMessage::Ok))
Some(CensusMessage::Ok));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/bldr/command/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn upload(config: &Config) -> BldrResult<()> {
try!(repo::client::put_key(url, cached));
}
Err(_) => {
return Err(bldr_error!(ErrorKind::KeyNotFound(config.key().to_string())))
return Err(bldr_error!(ErrorKind::KeyNotFound(config.key().to_string())));
}
}
} else {
Expand Down
26 changes: 26 additions & 0 deletions src/bldr/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::net;

use gossip::server::GOSSIP_DEFAULT_PORT;
use topology::Topology;
use repo;

Expand Down Expand Up @@ -69,6 +70,8 @@ pub struct Config {
servicekey: Option<String>,
infile: Option<String>,
outfile: Option<String>,
gossip_peer: Vec<String>,
gossip_permanent: bool,
}

impl Config {
Expand Down Expand Up @@ -292,6 +295,29 @@ impl Config {
self
}

pub fn gossip_permanent(&self) -> bool {
self.gossip_permanent
}

pub fn set_gossip_permanent(&mut self, p: bool) -> &mut Config {
self.gossip_permanent = p;
self
}

pub fn gossip_peer(&self) -> &[String] {
&self.gossip_peer
}

pub fn set_gossip_peer(&mut self, mut gp: Vec<String>) -> &mut Config {
for p in gp.iter_mut() {
if p.find(':').is_none() {
p.push_str(&format!(":{}", GOSSIP_DEFAULT_PORT));
}
}
self.gossip_peer = gp;
self
}

pub fn package_id(&self) -> String {
if self.version.is_some() && self.release.is_some() {
format!("{}/{}/{}/{}",
Expand Down
46 changes: 31 additions & 15 deletions src/bldr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ use std::num;
use std::string;
use std::ffi;
use std::sync::mpsc;
use std::str;

use gpgme;
use libarchive;
use uuid;
use wonder::actor;
use ansi_term::Colour::Red;
use rustc_serialize::json;

use hyper;
use toml;
use mustache;
use regex;
use msgpack;
use package;
use output::StructuredOutput;

Expand Down Expand Up @@ -115,6 +116,7 @@ pub enum ErrorKind {
MustacheMergeOnlyMaps,
SupervisorSignalFailed,
StringFromUtf8Error(string::FromUtf8Error),
StrFromUtf8Error(str::Utf8Error),
SupervisorDied,
NulError(ffi::NulError),
IPFailed,
Expand All @@ -132,9 +134,10 @@ pub enum ErrorKind {
CensusNotFound(String),
UuidParseError(uuid::ParseError),
InvalidPackageIdent(String),
MsgpackDecode(msgpack::decode::Error),
MsgpackEncode(msgpack::encode::Error),
InvalidKeyParameter(String)
InvalidKeyParameter(String),
JsonEncode(json::EncoderError),
JsonDecode(json::DecoderError),
InitialPeers,
}

/// Our result type alias, for easy coding.
Expand Down Expand Up @@ -223,6 +226,7 @@ impl fmt::Display for BldrError {
format!("Failed to send a signal to the process supervisor")
}
ErrorKind::StringFromUtf8Error(ref e) => format!("{}", e),
ErrorKind::StrFromUtf8Error(ref e) => format!("{}", e),
ErrorKind::SupervisorDied => format!("The supervisor died"),
ErrorKind::NulError(ref e) => format!("{}", e),
ErrorKind::IPFailed => format!("Failed to discover this hosts outbound IP address"),
Expand Down Expand Up @@ -253,9 +257,11 @@ impl fmt::Display for BldrError {
derivation/name (example: chef/redis)",
e)
}
ErrorKind::MsgpackDecode(ref e) => format!("Msgpack decoding error: {:?}", e),
ErrorKind::MsgpackEncode(ref e) => format!("Msgpack encoding error: {:?}", e),
ErrorKind::InvalidKeyParameter(ref e) => format!("Invalid key parameter: {:?}", e),
ErrorKind::InvalidKeyParameter(ref e) =>
format!("Invalid parameter for key generation: {:?}", e),
ErrorKind::JsonEncode(ref e) => format!("JSON encoding error: {}", e),
ErrorKind::JsonDecode(ref e) => format!("JSON decoding error: {}", e),
ErrorKind::InitialPeers => format!("Failed to contact initial peers"),
};
let cstring = Red.bold().paint(content).to_string();
let mut so = StructuredOutput::new("bldr",
Expand Down Expand Up @@ -309,6 +315,9 @@ impl Error for BldrError {
ErrorKind::StringFromUtf8Error(_) => {
"Failed to convert a string from a Vec<u8> as UTF-8"
}
ErrorKind::StrFromUtf8Error(_) => {
"Failed to convert a str from a &[u8] as UTF-8"
}
ErrorKind::SupervisorDied => "The supervisor died",
ErrorKind::NulError(_) => {
"An attempt was made to build a CString with a null byte inside it"
Expand Down Expand Up @@ -336,9 +345,10 @@ impl Error for BldrError {
ErrorKind::InvalidPackageIdent(_) => {
"Package identifiers must be in derivation/name format (example: chef/redis)"
}
ErrorKind::MsgpackDecode(_) => "Msgpack decoding error",
ErrorKind::MsgpackEncode(_) => "Msgpack encoding error",
ErrorKind::InvalidKeyParameter(_) => "Key parameter error",
ErrorKind::JsonEncode(_) => "JSON encoding error",
ErrorKind::JsonDecode(_) => "JSON decoding error: {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should {:?} be including here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Nope.

ErrorKind::InitialPeers => "Failed to contact initial peers",
}
}
}
Expand Down Expand Up @@ -400,6 +410,12 @@ impl From<string::FromUtf8Error> for BldrError {
}
}

impl From<str::Utf8Error> for BldrError {
fn from(err: str::Utf8Error) -> BldrError {
bldr_error!(ErrorKind::StrFromUtf8Error(err))
}
}

impl From<mpsc::TryRecvError> for BldrError {
fn from(err: mpsc::TryRecvError) -> BldrError {
bldr_error!(ErrorKind::TryRecvError(err))
Expand All @@ -424,14 +440,14 @@ impl From<actor::ActorError> for BldrError {
}
}

impl From<msgpack::decode::Error> for BldrError {
fn from(err: msgpack::decode::Error) -> Self {
bldr_error!(ErrorKind::MsgpackDecode(err))
impl From<json::EncoderError> for BldrError {
fn from(err: json::EncoderError) -> Self {
bldr_error!(ErrorKind::JsonEncode(err))
}
}

impl From<msgpack::encode::Error> for BldrError {
fn from(err: msgpack::encode::Error) -> Self {
bldr_error!(ErrorKind::MsgpackEncode(err))
impl From<json::DecoderError> for BldrError {
fn from(err: json::DecoderError) -> Self {
bldr_error!(ErrorKind::JsonDecode(err))
}
}
82 changes: 50 additions & 32 deletions src/bldr/gossip/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@
//! The Gossip Client.
//!
//! This module takes a `UtpSocket`, and lets you send and receive messages with it. Messages are
//! encoded with msgpack.
//! encoded with json.
//!

use msgpack::{Encoder, Decoder};
use rustc_serialize::{Encodable, Decodable};
use rustc_serialize::json;
use utp::UtpSocket;

use std::thread;
use std::net::ToSocketAddrs;
use std::str;

use error::BldrResult;
use gossip::message::{BUFFER_SIZE, Message};
use gossip::rumor::{Protocol, Peer, RumorList};

static LOGKEY: &'static str = "GC";
pub const BUFFER_SIZE: usize = 10000;

/// A Gossip Client.
pub struct Client {
socket: UtpSocket,
pub socket: UtpSocket,
}

impl Client {
Expand All @@ -47,24 +47,28 @@ impl Client {
/// # Errors
///
/// * If we cannot send a ping
/// * If we cannot receive a pong
pub fn ping(&mut self) -> BldrResult<()> {
try!(self.send_message(Message::Ping));
let msg = try!(self.recv_message());
match msg {
Message::Pong => debug!("Gossip is alive - Ping successful"),
_ => unreachable!(),
}
pub fn ping(&mut self, my_peer: Peer, rumors_for_remote: RumorList) -> BldrResult<()> {
try!(self.send_message(Protocol::Ping(my_peer, rumors_for_remote)));
Ok(())
}

/// Send a pingreq.
///
/// # Errors
///
/// * If we cannot send a pingreq
pub fn pingreq(&mut self, through_peer: Peer, rumors_for_remote: RumorList) -> BldrResult<()> {
try!(self.send_message(Protocol::PingReq(through_peer, rumors_for_remote)));
Ok(())
}

/// Send a pong.
/// Send a Ack.
///
/// # Errors
///
/// * If we cannot send a pong
pub fn pong(&mut self) -> BldrResult<()> {
try!(self.send_message(Message::Pong));
/// * If we cannot send a Ack
pub fn ack(&mut self, my_peer: Peer, rumors_for_remote: RumorList) -> BldrResult<()> {
try!(self.send_message(Protocol::Ack(my_peer, rumors_for_remote)));
Ok(())
}

Expand All @@ -73,14 +77,28 @@ impl Client {
/// # Errors
///
/// * We cannot receive the data from the socket
/// * We cannot decode the data into a `gossip::message::Message`
pub fn recv_message(&mut self) -> BldrResult<Message> {
let mut buf = Vec::with_capacity(BUFFER_SIZE);
let (amt, src) = try!(self.socket.recv_from(&mut buf));

let mut decoder = Decoder::new(&buf[0..amt]);
let msg: Message = try!(Decodable::decode(&mut decoder));
debug!("Received message ({:?}): {:?}", src, msg);
/// * We cannot decode the data into a `gossip::message::Protocol`
pub fn recv_message(&mut self) -> BldrResult<Protocol> {
let mut buf = [0u8; BUFFER_SIZE];
let mut json_str = String::new();
let mut keep_reading_buffer = true;

while keep_reading_buffer {
let (amt, _src) = try!(self.socket.recv_from(&mut buf));
match amt {
0 => keep_reading_buffer = false,
amt => {
let partial_str = try!(str::from_utf8(&buf[..amt]));
json_str.push_str(partial_str);
}
}
}

debug!("Received protocol ({:?}): {}",
self.socket.peer_addr(),
json_str);

let msg: Protocol = try!(json::decode(&json_str));
Ok(msg)
}

Expand All @@ -90,11 +108,11 @@ impl Client {
///
/// * We cannot encode the `Message`
/// * We fail to send the encoded buffer to the remote
pub fn send_message(&mut self, msg: Message) -> BldrResult<()> {
let mut buf = Vec::with_capacity(BUFFER_SIZE);
try!(msg.encode(&mut Encoder::new(&mut &mut buf)));
try!(self.socket.send_to(&buf[..]));
debug!("Sent message: {:?}", msg);
pub fn send_message(&mut self, msg: Protocol) -> BldrResult<()> {
let encoded = try!(json::encode(&msg));
debug!("Encoded message {:#?}", encoded);
try!(self.socket.send_to(encoded.as_bytes()));
debug!("Sent protocol: {:?}", msg);
Ok(())
}
}
Loading