Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
fix(simplify websocket_unsubscribe)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Nov 20, 2018
1 parent e7293c4 commit 55a0d2e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 52 deletions.
22 changes: 6 additions & 16 deletions parity-clib-examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ const int SUBSCRIPTION_ID_LEN = 18;
// global variable to keep track of the received rpc responses
static int g_rpc_counter = 0;

// global string for callbacks
static std::string g_str;

// list of rpc queries
static std::vector<std::string> rpc_queries {
"{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}",
Expand All @@ -49,8 +46,8 @@ void ws_response(void* _unused, const char* response, size_t len) {
printf("ws_response: %s\r\n", response);
std::regex is_subscription ("\\{\"jsonrpc\":\"2.0\",\"result\":\"0[xX][a-fA-F0-9]{16}\",\"id\":1\\}");
// assume only one subscription is used
if (std::regex_match(response, is_subscription) == true && g_str.empty()) {
g_str = response;
if (std::regex_match(response, is_subscription) == true) {
g_rpc_counter -= 1;
}
}

Expand Down Expand Up @@ -128,27 +125,20 @@ int parity_subscribe_to_websocket(void* parity) {

size_t timeout = 1000;
int num_queries = 1;
g_str.clear();
g_rpc_counter = 1;

std::string subscribe = "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}";
std::string unsubscribe = "{\"method\":\"eth_unsubscribe\",\"params\":[\"0x1234567891234567\"],\"id\":1,\"jsonrpc\":\"2.0\"}";

const void *const handle = parity_subscribe_ws(parity, subscribe.c_str(), subscribe.length(), ws_response);

if (!handle) {
return 1;
}

while(g_str.empty());
while(g_rpc_counter != 0);
std::this_thread::sleep_for(std::chrono::seconds(60));

// Replace subscription_id with the id we got in the callback
// (this is not a good practice use your favorite JSON parser)
unsubscribe.replace(39, SUBSCRIPTION_ID_LEN, g_str, 27, SUBSCRIPTION_ID_LEN);
if (parity_unsubscribe_ws(parity, handle, unsubscribe.c_str(), unsubscribe.length(), timeout, ws_response) != 0) {
return 1;
}

parity_unsubscribe_ws(handle);
return 0;
}

Expand Down Expand Up @@ -185,7 +175,7 @@ void* parity_light_run() {
.on_client_restart_cb_custom = nullptr
};

std::vector<const char*> args = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"};
std::vector<const char*> args = {"--no-ipc" , "--light", "--jsonrpc-apis=all", "--chain", "kovan"};
std::vector<size_t> str_lens;

for (auto arg: args) {
Expand Down
15 changes: 3 additions & 12 deletions parity-clib/parity.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,12 @@ int parity_rpc(const void *const parity, const char* rpc_query, size_t rpc_len,
const void *const parity_subscribe_ws(const void *const parity, const char* ws_query, size_t len,
void (*subscribe)(void* custom, const char* response, size_t len));

/// Unsubscribes from a specific websocket event. Caution this function consumes the session object and must only be
/// Unsubscribes from a websocket subscription. Caution this function consumes the session object and must only be
/// used exactly once per session.
///
/// - parity : Reference to the running parity client
/// - session : Underlying pointer to an atomic reference counter
/// - ws_query : JSON encoded string representing the websocket event to unsubscribe from
/// - len : Length of the query
/// - timeout : Maximum time in milliseconds to wait for a response
/// - response : Callback to invoke when the current session has been terminated
///
/// - On success : The function returns 0
/// - On error : The function returns non-zero
//
int parity_unsubscribe_ws(const void *const parity, const void *const session, const char* ws_query,
size_t len, size_t timeout, void (*unsubscribe)(void* custom, const char* response, size_t len));
///
int parity_unsubscribe_ws(const void *const session);

/// Sets a callback to call when a panic happens in the Rust code.
///
Expand Down
41 changes: 17 additions & 24 deletions parity-clib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ fn parity_rpc_dispatcher(
None => to_cstring(error::EMPTY.as_bytes()),
};
cb(ptr::null_mut(), cstr, len);
()
});

let _handle = thread::Builder::new()
Expand Down Expand Up @@ -223,14 +222,15 @@ pub unsafe extern fn parity_subscribe_ws(
if let Some((client, query, callback)) = parity_rpc_query_checker(client, query, len, callback) {
let (tx, mut rx) = mpsc::channel(1);
let session = Arc::new(PubSubSession::new(tx));
let ffi_session = session.clone();
let query = client.rpc_query(query, Some(session.clone()));
let weak_session = Arc::downgrade(&session);

let _handle = thread::Builder::new()
.name("ws-subscriber".into())
.spawn(move || {

// wait for subscription ID
// Wait for subscription ID
// Note this may block forever and be can't destroyed using the session object)
// FIXME: add timeout
match query.wait() {
Ok(Some(response)) => {
let (cstr, len) = to_cstring(response.as_bytes());
Expand All @@ -244,18 +244,24 @@ pub unsafe extern fn parity_subscribe_ws(
}
};

while Arc::strong_count(&session) > 1 {
loop {
for response in rx.by_ref().wait() {
if let Ok(r) = response {
let (cstring, len) = to_cstring(r.as_bytes());
callback(ptr::null_mut(), cstring, len);
}
}

let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
// No subscription left, then terminate
if rc <= 1 {
break;
}
}
}
})
.expect("rpc-subscriber thread shouldn't fail; qed");

Arc::into_raw(ffi_session) as *const c_void
Arc::into_raw(session) as *const c_void
} else {
ptr::null()
}
Expand All @@ -264,23 +270,10 @@ pub unsafe extern fn parity_subscribe_ws(
}

#[no_mangle]
pub unsafe extern fn parity_unsubscribe_ws(
client: *const c_void,
session: *const c_void,
query: *const c_char,
len: usize,
timeout_ms: usize,
callback: Callback,
) -> c_int {
panic::catch_unwind(|| {
if let Some((client, query, callback)) = parity_rpc_query_checker(client, query, len, callback) {
let session = Some(Arc::from_raw(session as *const PubSubSession));
parity_rpc_dispatcher(client, query, session, callback, timeout_ms, "ws-unsubscribe");
0
} else {
1
}
}).unwrap_or(1)
pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) {
let _ = panic::catch_unwind(|| {
let _session = Arc::from_raw(session as *const PubSubSession);
});
}

#[no_mangle]
Expand Down

0 comments on commit 55a0d2e

Please sign in to comment.