Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split object_space_dump into smaller messages #77

Merged
merged 15 commits into from
Jan 24, 2015
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/vendor/ruby/
/ext/Makefile
/ext/Makefile_2_2_0
/ext/mkmf.log
/ext/rbkit_tracer.bundle
/ext/*.o
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ WIP
===

* Send server and protocol versions along with handshake response [#103](https://github.com/code-mancers/rbkit/pull/103)
* Split object dump into smaller messages [#77](https://github.com/code-mancers/rbkit/pull/77)
7 changes: 5 additions & 2 deletions docs/protocol.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Rbkit Message Protocol v1.0
# Rbkit Message Protocol v2.0

This is the documentation for the protocol that's used when Rbkit server
and client communicate with each other. A client can work with a server
Expand Down Expand Up @@ -167,14 +167,17 @@ When the GC_END_SWEEP event is triggered, no payload is sent.

### Message frame for OBJECT_SPACE_DUMP :

Object space dump is split into multiple messages. Each message is of
the following format :

```yaml
{
event_type: object_space_dump
timestamp: <timestamp in milliseconds>,
payload: [
{
correlation_id: <ID_INDICATING_EVENT_THIS_MESSAGE_IS_PART_OF>,
object_id: <OBJECT_ID>,
snapshot_no: <SNAPSHOT_COUNT>,
class_name: <CLASS_NAME>,
references: [<OBJECT_ID>, <OBJECT_ID>, ... ],
file: <FILE_PATH>,
Expand Down
56 changes: 56 additions & 0 deletions experiments/cpu_profiler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require 'zmq'
require 'msgpack'
require 'pp'

Thread.abort_on_exception = true


commands = [
'start_memory_profile',
'stop_memory_profile',
'start_cpu_profile',
'stop_cpu_profile',
'objectspace_snapshot',
'trigger_gc'
]

output_file = File.open("/tmp/rbkit.log", "w")
puts "Writing output to file #{output_file.path}"
ctx = ZMQ::Context.new

puts "Enter IPv4 address of Rbkit server. (Blank for localhost) :"
server_ip = gets.strip
server_ip = "127.0.0.1" if server_ip.empty?

Thread.new do
request_socket = ctx.socket(:REQ)
request_socket.connect("tcp://#{server_ip}:5556")
loop do
puts "Available commands :"
commands.each_with_index do |c, i|
puts "#{i+1}. #{c}"
end
command = commands[gets.strip.to_i - 1] rescue ''
unless command.empty?
request_socket.send(command)
puts "sent #{command}"
response = request_socket.recv()
puts "received #{response}"
end
end
end

socket = ctx.socket(:SUB)
socket.subscribe("")
socket.connect("tcp://#{server_ip}:5555")

begin
loop do
message = socket.recv
unpacked_message = MessagePack.unpack(message)
PP.pp(unpacked_message, output_file)
output_file.flush
end
ensure
output_file.close
end
7 changes: 7 additions & 0 deletions ext/rbkit_event.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "rbkit_event.h"

static size_t correlation_id = 0;

VALUE rbkit_event_types_as_hash() {
VALUE events = rb_hash_new();
rb_hash_aset(events, ID2SYM(rb_intern("obj_created")), INT2FIX(obj_created));
Expand Down Expand Up @@ -55,6 +57,11 @@ rbkit_object_space_dump_event *new_rbkit_object_space_dump_event(rbkit_object_du
header->event_type = object_space_dump;

event->dump = dump;
event->packed_objects = 0;
event->object_count = dump->object_count;
event->current_page = dump->first;
event->current_page_index = 0;
event->correlation_id = ++correlation_id;
return event;
}

Expand Down
5 changes: 5 additions & 0 deletions ext/rbkit_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ rbkit_hash_event *new_rbkit_hash_event(rbkit_event_type event_type, VALUE hash);
typedef struct _rbkit_object_space_dump_event {
rbkit_event_header event_header;
rbkit_object_dump *dump;
size_t packed_objects;
size_t object_count;
rbkit_object_dump_page *current_page;
size_t current_page_index;
size_t correlation_id;
} rbkit_object_space_dump_event;

rbkit_object_space_dump_event *new_rbkit_object_space_dump_event(rbkit_object_dump *dump);
Expand Down
164 changes: 93 additions & 71 deletions ext/rbkit_event_packer.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,83 +102,104 @@ static void pack_hash_event(rbkit_hash_event *event, msgpack_packer *packer) {

static void pack_object_space_dump_event(rbkit_object_space_dump_event *event, msgpack_packer *packer) {
rbkit_object_dump *dump = event->dump;
msgpack_pack_map(packer, 3);
msgpack_pack_map(packer, 4);
pack_event_header(packer, event->event_header.event_type);

// Incrementing integer holding the correlation_id
// indicating the event which the message belongs to
msgpack_pack_int(packer, rbkit_message_field_correlation_id);
msgpack_pack_int(packer, event->correlation_id);

msgpack_pack_int(packer, rbkit_message_field_payload);

// Find the batch size
size_t objects_in_batch = MAX_OBJECT_DUMPS_IN_MESSAGE ;
int objects_left = event->object_count - event->packed_objects;
if(objects_left < MAX_OBJECT_DUMPS_IN_MESSAGE)
objects_in_batch = objects_left;

// Set size of array to hold all objects
msgpack_pack_array(packer, dump->object_count);
msgpack_pack_array(packer, objects_in_batch);

// Iterate through all object data
rbkit_object_dump_page * page = dump->first ;
while(page != NULL) {
rbkit_object_data *data;
size_t i = 0;
for(;i < page->count; i++) {
data = &(page->data[i]);
/* Object dump is a map that looks like this :
* {
* object_id: <OBJECT_ID_IN_HEX>,
* class: <CLASS_NAME>,
* references: [<OBJECT_ID_IN_HEX>, <OBJECT_ID_IN_HEX>, ...],
* file: <FILE_PATH>,
* line: <LINE_NO>,
* size: <SIZE>
* }
*/

msgpack_pack_map(packer, 6);

// Key1 : rbkit_message_field_object_id
msgpack_pack_int(packer, rbkit_message_field_object_id);

// Value1 : pointer address of object
msgpack_pack_unsigned_long_long(packer, data->object_id);

// Key2 : rbkit_message_field_class_name
msgpack_pack_int(packer, rbkit_message_field_class_name);

// Value2 : Class name of object
pack_string(packer, data->class_name);

// Key3 : rbkit_message_field_references
msgpack_pack_int(packer, rbkit_message_field_references);

// Value3 : References held by the object
msgpack_pack_array(packer, data->reference_count);
if(data->reference_count != 0) {
size_t count = 0;
for(; count < data->reference_count; count++ )
msgpack_pack_unsigned_long_long(packer, data->references[count]);
free(data->references);
}

// Key4 : rbkit_message_field_file
msgpack_pack_int(packer, rbkit_message_field_file);

// Value4 : File path where object is defined
pack_string(packer, data->file);

// Key5 : rbkit_message_field_line
msgpack_pack_int(packer, rbkit_message_field_line);

// Value5 : Line no where object is defined
if(data->line == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_unsigned_long(packer, data->line);

// Key6 : rbkit_message_field_size
msgpack_pack_int(packer, rbkit_message_field_size);

// Value6 : Size of the object in memory
if(data->size == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_uint32(packer, data->size);
int count = 0;
int i = 0;
rbkit_object_data *data;
rbkit_object_dump_page * page;
while(count < objects_in_batch) {
if(event->current_page_index == RBKIT_OBJECT_DUMP_PAGE_SIZE) {
event->current_page_index = 0;
rbkit_object_dump_page * prev = event->current_page;
event->current_page = event->current_page->next;
free(prev);
}
page = event->current_page;
i = event->current_page_index;
data = &(page->data[i]);
/* Object dump is a map that looks like this :
* {
* object_id: <OBJECT_ID_IN_HEX>,
* class: <CLASS_NAME>,
* references: [<OBJECT_ID_IN_HEX>, <OBJECT_ID_IN_HEX>, ...],
* file: <FILE_PATH>,
* line: <LINE_NO>,
* size: <SIZE>
* }
*/

msgpack_pack_map(packer, 6);

// Key1 : rbkit_message_field_object_id
msgpack_pack_int(packer, rbkit_message_field_object_id);

// Value1 : pointer address of object
msgpack_pack_unsigned_long_long(packer, data->object_id);

// Key2 : rbkit_message_field_class_name
msgpack_pack_int(packer, rbkit_message_field_class_name);

// Value2 : Class name of object
pack_string(packer, data->class_name);

// Key3 : rbkit_message_field_references
msgpack_pack_int(packer, rbkit_message_field_references);

// Value3 : References held by the object
msgpack_pack_array(packer, data->reference_count);
if(data->reference_count != 0) {
size_t count = 0;
for(; count < data->reference_count; count++ )
msgpack_pack_unsigned_long_long(packer, data->references[count]);
free(data->references);
}
rbkit_object_dump_page * prev = page;
page = page->next;
free(prev);

// Key4 : rbkit_message_field_file
msgpack_pack_int(packer, rbkit_message_field_file);

// Value4 : File path where object is defined
pack_string(packer, data->file);

// Key5 : rbkit_message_field_line
msgpack_pack_int(packer, rbkit_message_field_line);

// Value5 : Line no where object is defined
if(data->line == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_unsigned_long(packer, data->line);

// Key6 : rbkit_message_field_size
msgpack_pack_int(packer, rbkit_message_field_size);

// Value6 : Size of the object in memory
if(data->size == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_uint32(packer, data->size);

event->current_page_index++;
event->packed_objects++;
count++;
}
}

Expand Down Expand Up @@ -246,6 +267,7 @@ VALUE rbkit_message_fields_as_hash() {
rb_hash_aset(events, ID2SYM(rb_intern("line")), INT2FIX(rbkit_message_field_line));
rb_hash_aset(events, ID2SYM(rb_intern("size")), INT2FIX(rbkit_message_field_size));
rb_hash_aset(events, ID2SYM(rb_intern("message_counter")), INT2FIX(rbkit_message_field_message_counter));
rb_hash_aset(events, ID2SYM(rb_intern("correlation_id")), INT2FIX(rbkit_message_field_correlation_id));
OBJ_FREEZE(events);
return events;
}
Expand Down
11 changes: 9 additions & 2 deletions ext/rbkit_event_packer.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#ifndef RBKIT_MESSAGE_PACKER
#define RBKIT_MESSAGE_PACKER
#define RBKIT_PROTOCOL_VERSION "1.0"
#define RBKIT_PROTOCOL_VERSION "2.0"
#include "msgpack.h"
#include "rbkit_event.h"

// Object dump will be split into multiple
// messages. This macro defines the number
// of object data that should be packed
// as the payload of one message.
#define MAX_OBJECT_DUMPS_IN_MESSAGE 1000

typedef enum _rbkit_message_fields {
rbkit_message_field_event_type,
rbkit_message_field_timestamp,
Expand All @@ -14,7 +20,8 @@ typedef enum _rbkit_message_fields {
rbkit_message_field_file,
rbkit_message_field_line,
rbkit_message_field_size,
rbkit_message_field_message_counter
rbkit_message_field_message_counter,
rbkit_message_field_correlation_id
} rbkit_message_fields;

VALUE rbkit_message_fields_as_hash();
Expand Down
8 changes: 7 additions & 1 deletion ext/rbkit_message_aggregator.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void message_list_clear() {

// Copies the msgpack sbuffer to the end of
// a dynamically growing array
void add_message(msgpack_sbuffer *buffer) {
void queue_message(msgpack_sbuffer *buffer) {
while(!has_enough_space_for(buffer->size))
double_the_capacity();
memcpy(message_array + used_memsize, buffer->data, buffer->size);
Expand All @@ -57,3 +57,9 @@ void get_event_collection_message(msgpack_sbuffer *sbuf) {
free(event);
msgpack_packer_free(pk);
}

// Returns the number of messages which are queued in the
// current event_collection batch
size_t queued_message_count() {
return no_of_messages;
}
7 changes: 6 additions & 1 deletion ext/rbkit_message_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

#include "msgpack.h"

// No of events that will be grouped
// into one event_collection message
#define MESSAGE_BATCH_SIZE 400

void message_list_new();
void add_message(msgpack_sbuffer *);
void queue_message(msgpack_sbuffer *);
void get_event_collection_message(msgpack_sbuffer *);
void message_list_destroy();
void message_list_clear();
size_t queued_message_count();

#endif
6 changes: 0 additions & 6 deletions ext/rbkit_test_helper.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
#include <ruby.h>
#include "rbkit_message_aggregator.h"

static VALUE noop_send_messages() {
//NOOP
return Qnil;
}

static VALUE get_queued_messages() {
msgpack_sbuffer * sbuf = msgpack_sbuffer_new();
get_event_collection_message(sbuf);
Expand All @@ -20,6 +15,5 @@ static VALUE get_queued_messages() {

void Init_rbkit_test_helper(void) {
VALUE rbkit_module = rb_define_module("Rbkit");
rb_define_module_function(rbkit_module, "send_messages", noop_send_messages, 0);
rb_define_module_function(rbkit_module, "get_queued_messages", get_queued_messages, 0);
}
Loading