Skip to content

Commit

Permalink
Merge pull request #77 from code-mancers/split-dump
Browse files Browse the repository at this point in the history
Split object_space_dump into smaller messages
  • Loading branch information
Hemant Kumar committed Jan 24, 2015
2 parents 4a923a7 + 674b7c2 commit 525154f
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 122 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/vendor/ruby/
/ext/Makefile
/ext/Makefile_2_2_0
/ext/mkmf.log
/ext/rbkit_tracer.bundle
/ext/*.o
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ WIP
===

* Send server and protocol versions along with handshake response [#103](https://github.com/code-mancers/rbkit/pull/103)
* Split object dump into smaller messages [#77](https://github.com/code-mancers/rbkit/pull/77)
7 changes: 5 additions & 2 deletions docs/protocol.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Rbkit Message Protocol v1.0
# Rbkit Message Protocol v2.0

This is the documentation for the protocol that's used when Rbkit server
and client communicate with each other. A client can work with a server
Expand Down Expand Up @@ -167,14 +167,17 @@ When the GC_END_SWEEP event is triggered, no payload is sent.

### Message frame for OBJECT_SPACE_DUMP :

Object space dump is split into multiple messages. Each message is of
the following format :

```yaml
{
event_type: object_space_dump
timestamp: <timestamp in milliseconds>,
payload: [
{
correlation_id: <ID_INDICATING_EVENT_THIS_MESSAGE_IS_PART_OF>,
object_id: <OBJECT_ID>,
snapshot_no: <SNAPSHOT_COUNT>,
class_name: <CLASS_NAME>,
references: [<OBJECT_ID>, <OBJECT_ID>, ... ],
file: <FILE_PATH>,
Expand Down
56 changes: 56 additions & 0 deletions experiments/cpu_profiler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require 'zmq'
require 'msgpack'
require 'pp'

Thread.abort_on_exception = true


commands = [
'start_memory_profile',
'stop_memory_profile',
'start_cpu_profile',
'stop_cpu_profile',
'objectspace_snapshot',
'trigger_gc'
]

output_file = File.open("/tmp/rbkit.log", "w")
puts "Writing output to file #{output_file.path}"
ctx = ZMQ::Context.new

puts "Enter IPv4 address of Rbkit server. (Blank for localhost) :"
server_ip = gets.strip
server_ip = "127.0.0.1" if server_ip.empty?

Thread.new do
request_socket = ctx.socket(:REQ)
request_socket.connect("tcp://#{server_ip}:5556")
loop do
puts "Available commands :"
commands.each_with_index do |c, i|
puts "#{i+1}. #{c}"
end
command = commands[gets.strip.to_i - 1] rescue ''
unless command.empty?
request_socket.send(command)
puts "sent #{command}"
response = request_socket.recv()
puts "received #{response}"
end
end
end

socket = ctx.socket(:SUB)
socket.subscribe("")
socket.connect("tcp://#{server_ip}:5555")

begin
loop do
message = socket.recv
unpacked_message = MessagePack.unpack(message)
PP.pp(unpacked_message, output_file)
output_file.flush
end
ensure
output_file.close
end
7 changes: 7 additions & 0 deletions ext/rbkit_event.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "rbkit_event.h"

static size_t correlation_id = 0;

VALUE rbkit_event_types_as_hash() {
VALUE events = rb_hash_new();
rb_hash_aset(events, ID2SYM(rb_intern("obj_created")), INT2FIX(obj_created));
Expand Down Expand Up @@ -55,6 +57,11 @@ rbkit_object_space_dump_event *new_rbkit_object_space_dump_event(rbkit_object_du
header->event_type = object_space_dump;

event->dump = dump;
event->packed_objects = 0;
event->object_count = dump->object_count;
event->current_page = dump->first;
event->current_page_index = 0;
event->correlation_id = ++correlation_id;
return event;
}

Expand Down
5 changes: 5 additions & 0 deletions ext/rbkit_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ rbkit_hash_event *new_rbkit_hash_event(rbkit_event_type event_type, VALUE hash);
typedef struct _rbkit_object_space_dump_event {
rbkit_event_header event_header;
rbkit_object_dump *dump;
size_t packed_objects;
size_t object_count;
rbkit_object_dump_page *current_page;
size_t current_page_index;
size_t correlation_id;
} rbkit_object_space_dump_event;

rbkit_object_space_dump_event *new_rbkit_object_space_dump_event(rbkit_object_dump *dump);
Expand Down
164 changes: 93 additions & 71 deletions ext/rbkit_event_packer.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,83 +102,104 @@ static void pack_hash_event(rbkit_hash_event *event, msgpack_packer *packer) {

static void pack_object_space_dump_event(rbkit_object_space_dump_event *event, msgpack_packer *packer) {
rbkit_object_dump *dump = event->dump;
msgpack_pack_map(packer, 3);
msgpack_pack_map(packer, 4);
pack_event_header(packer, event->event_header.event_type);

// Incrementing integer holding the correlation_id
// indicating the event which the message belongs to
msgpack_pack_int(packer, rbkit_message_field_correlation_id);
msgpack_pack_int(packer, event->correlation_id);

msgpack_pack_int(packer, rbkit_message_field_payload);

// Find the batch size
size_t objects_in_batch = MAX_OBJECT_DUMPS_IN_MESSAGE ;
int objects_left = event->object_count - event->packed_objects;
if(objects_left < MAX_OBJECT_DUMPS_IN_MESSAGE)
objects_in_batch = objects_left;

// Set size of array to hold all objects
msgpack_pack_array(packer, dump->object_count);
msgpack_pack_array(packer, objects_in_batch);

// Iterate through all object data
rbkit_object_dump_page * page = dump->first ;
while(page != NULL) {
rbkit_object_data *data;
size_t i = 0;
for(;i < page->count; i++) {
data = &(page->data[i]);
/* Object dump is a map that looks like this :
* {
* object_id: <OBJECT_ID_IN_HEX>,
* class: <CLASS_NAME>,
* references: [<OBJECT_ID_IN_HEX>, <OBJECT_ID_IN_HEX>, ...],
* file: <FILE_PATH>,
* line: <LINE_NO>,
* size: <SIZE>
* }
*/

msgpack_pack_map(packer, 6);

// Key1 : rbkit_message_field_object_id
msgpack_pack_int(packer, rbkit_message_field_object_id);

// Value1 : pointer address of object
msgpack_pack_unsigned_long_long(packer, data->object_id);

// Key2 : rbkit_message_field_class_name
msgpack_pack_int(packer, rbkit_message_field_class_name);

// Value2 : Class name of object
pack_string(packer, data->class_name);

// Key3 : rbkit_message_field_references
msgpack_pack_int(packer, rbkit_message_field_references);

// Value3 : References held by the object
msgpack_pack_array(packer, data->reference_count);
if(data->reference_count != 0) {
size_t count = 0;
for(; count < data->reference_count; count++ )
msgpack_pack_unsigned_long_long(packer, data->references[count]);
free(data->references);
}

// Key4 : rbkit_message_field_file
msgpack_pack_int(packer, rbkit_message_field_file);

// Value4 : File path where object is defined
pack_string(packer, data->file);

// Key5 : rbkit_message_field_line
msgpack_pack_int(packer, rbkit_message_field_line);

// Value5 : Line no where object is defined
if(data->line == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_unsigned_long(packer, data->line);

// Key6 : rbkit_message_field_size
msgpack_pack_int(packer, rbkit_message_field_size);

// Value6 : Size of the object in memory
if(data->size == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_uint32(packer, data->size);
int count = 0;
int i = 0;
rbkit_object_data *data;
rbkit_object_dump_page * page;
while(count < objects_in_batch) {
if(event->current_page_index == RBKIT_OBJECT_DUMP_PAGE_SIZE) {
event->current_page_index = 0;
rbkit_object_dump_page * prev = event->current_page;
event->current_page = event->current_page->next;
free(prev);
}
page = event->current_page;
i = event->current_page_index;
data = &(page->data[i]);
/* Object dump is a map that looks like this :
* {
* object_id: <OBJECT_ID_IN_HEX>,
* class: <CLASS_NAME>,
* references: [<OBJECT_ID_IN_HEX>, <OBJECT_ID_IN_HEX>, ...],
* file: <FILE_PATH>,
* line: <LINE_NO>,
* size: <SIZE>
* }
*/

msgpack_pack_map(packer, 6);

// Key1 : rbkit_message_field_object_id
msgpack_pack_int(packer, rbkit_message_field_object_id);

// Value1 : pointer address of object
msgpack_pack_unsigned_long_long(packer, data->object_id);

// Key2 : rbkit_message_field_class_name
msgpack_pack_int(packer, rbkit_message_field_class_name);

// Value2 : Class name of object
pack_string(packer, data->class_name);

// Key3 : rbkit_message_field_references
msgpack_pack_int(packer, rbkit_message_field_references);

// Value3 : References held by the object
msgpack_pack_array(packer, data->reference_count);
if(data->reference_count != 0) {
size_t count = 0;
for(; count < data->reference_count; count++ )
msgpack_pack_unsigned_long_long(packer, data->references[count]);
free(data->references);
}
rbkit_object_dump_page * prev = page;
page = page->next;
free(prev);

// Key4 : rbkit_message_field_file
msgpack_pack_int(packer, rbkit_message_field_file);

// Value4 : File path where object is defined
pack_string(packer, data->file);

// Key5 : rbkit_message_field_line
msgpack_pack_int(packer, rbkit_message_field_line);

// Value5 : Line no where object is defined
if(data->line == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_unsigned_long(packer, data->line);

// Key6 : rbkit_message_field_size
msgpack_pack_int(packer, rbkit_message_field_size);

// Value6 : Size of the object in memory
if(data->size == 0)
msgpack_pack_nil(packer);
else
msgpack_pack_uint32(packer, data->size);

event->current_page_index++;
event->packed_objects++;
count++;
}
}

Expand Down Expand Up @@ -246,6 +267,7 @@ VALUE rbkit_message_fields_as_hash() {
rb_hash_aset(events, ID2SYM(rb_intern("line")), INT2FIX(rbkit_message_field_line));
rb_hash_aset(events, ID2SYM(rb_intern("size")), INT2FIX(rbkit_message_field_size));
rb_hash_aset(events, ID2SYM(rb_intern("message_counter")), INT2FIX(rbkit_message_field_message_counter));
rb_hash_aset(events, ID2SYM(rb_intern("correlation_id")), INT2FIX(rbkit_message_field_correlation_id));
OBJ_FREEZE(events);
return events;
}
Expand Down
11 changes: 9 additions & 2 deletions ext/rbkit_event_packer.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#ifndef RBKIT_MESSAGE_PACKER
#define RBKIT_MESSAGE_PACKER
#define RBKIT_PROTOCOL_VERSION "1.0"
#define RBKIT_PROTOCOL_VERSION "2.0"
#include "msgpack.h"
#include "rbkit_event.h"

// Object dump will be split into multiple
// messages. This macro defines the number
// of object data that should be packed
// as the payload of one message.
#define MAX_OBJECT_DUMPS_IN_MESSAGE 1000

typedef enum _rbkit_message_fields {
rbkit_message_field_event_type,
rbkit_message_field_timestamp,
Expand All @@ -14,7 +20,8 @@ typedef enum _rbkit_message_fields {
rbkit_message_field_file,
rbkit_message_field_line,
rbkit_message_field_size,
rbkit_message_field_message_counter
rbkit_message_field_message_counter,
rbkit_message_field_correlation_id
} rbkit_message_fields;

VALUE rbkit_message_fields_as_hash();
Expand Down
8 changes: 7 additions & 1 deletion ext/rbkit_message_aggregator.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void message_list_clear() {

// Copies the msgpack sbuffer to the end of
// a dynamically growing array
void add_message(msgpack_sbuffer *buffer) {
void queue_message(msgpack_sbuffer *buffer) {
while(!has_enough_space_for(buffer->size))
double_the_capacity();
memcpy(message_array + used_memsize, buffer->data, buffer->size);
Expand All @@ -57,3 +57,9 @@ void get_event_collection_message(msgpack_sbuffer *sbuf) {
free(event);
msgpack_packer_free(pk);
}

// Returns the number of messages which are queued in the
// current event_collection batch
size_t queued_message_count() {
return no_of_messages;
}
7 changes: 6 additions & 1 deletion ext/rbkit_message_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

#include "msgpack.h"

// No of events that will be grouped
// into one event_collection message
#define MESSAGE_BATCH_SIZE 400

void message_list_new();
void add_message(msgpack_sbuffer *);
void queue_message(msgpack_sbuffer *);
void get_event_collection_message(msgpack_sbuffer *);
void message_list_destroy();
void message_list_clear();
size_t queued_message_count();

#endif
6 changes: 0 additions & 6 deletions ext/rbkit_test_helper.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
#include <ruby.h>
#include "rbkit_message_aggregator.h"

static VALUE noop_send_messages() {
//NOOP
return Qnil;
}

static VALUE get_queued_messages() {
msgpack_sbuffer * sbuf = msgpack_sbuffer_new();
get_event_collection_message(sbuf);
Expand All @@ -20,6 +15,5 @@ static VALUE get_queued_messages() {

void Init_rbkit_test_helper(void) {
VALUE rbkit_module = rb_define_module("Rbkit");
rb_define_module_function(rbkit_module, "send_messages", noop_send_messages, 0);
rb_define_module_function(rbkit_module, "get_queued_messages", get_queued_messages, 0);
}
Loading

0 comments on commit 525154f

Please sign in to comment.