Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mask monitoring + Data injection #25

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CPP += -Ibitshuffle
INCFILES=ch_frb_io.hpp \
ch_frb_io_internals.hpp \
assembled_chunk_msgpack.hpp \
msgpack_binary_vector.hpp \
bitshuffle/bitshuffle.h bitshuffle/bitshuffle_core.h \
bitshuffle/bitshuffle_internals.h bitshuffle/iochain.h \
chlog.hpp
Expand Down
1 change: 1 addition & 0 deletions assembled_chunk_ringbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
max_fpga_flushed(0),
max_fpga_retrieved(0),
first_fpgacount(0),
first_packet_received(false),
ini_params(ini_params_),
beam_id(beam_id_),
stream_id(stream_id_),
Expand Down
5 changes: 3 additions & 2 deletions ch_frb_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class intensity_network_stream : noncopyable {

// Returns the first fpgacount of the first chunk sent downstream by
// the given beam id.
// Raises runtime_error if the first packet has not been received yet.
uint64_t get_first_fpga_count(int beam);

// Returns the last FPGA count processed by each of the assembler,
Expand Down Expand Up @@ -549,8 +550,8 @@ class intensity_network_stream : noncopyable {
std::atomic<uint64_t> assembler_thread_waiting_usec;
std::atomic<uint64_t> assembler_thread_working_usec;

// Initialized by assembler thread when first packet is received, constant thereafter.
uint64_t frame0_nano = 0; // nanosecond time() value for fgpacount zero.
// Initialized to zero by constructor, set to nonzero value by assembler thread when first packet is received.
std::atomic<uint64_t> frame0_nano; // nanosecond time() value for fgpacount zero.

char _pad1b[constants::cache_line_size];

Expand Down
8 changes: 4 additions & 4 deletions ch_frb_io_internals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class assembled_chunk_ringbuf : noncopyable,
std::atomic<uint64_t> max_fpga_retrieved;
// The fpgacount of the first chunk produced by this stream
std::atomic<uint64_t> first_fpgacount;


// Set to 'true' in the first call to put_unassembled_packet().
std::atomic<bool> first_packet_received;

assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params, int beam_id, int stream_id);

~assembled_chunk_ringbuf();
Expand Down Expand Up @@ -347,9 +350,6 @@ class assembled_chunk_ringbuf : noncopyable,

output_device_pool output_devices;

// Set to 'true' in the first call to put_unassembled_packet().
bool first_packet_received = false;

// Helper function called in assembler thread, to add a new assembled_chunk to the ring buffer.
// Resets 'chunk' to a null pointer.
// Warning: only safe to call from assembler thread.
Expand Down
16 changes: 12 additions & 4 deletions intensity_network_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ intensity_network_stream::get_statistics() {
m["nupfreq"] = ini_params.nupfreq;
m["nt_per_packet"] = ini_params.nt_per_packet;
m["fpga_counts_per_sample"] = ini_params.fpga_counts_per_sample;
m["frame0_nano"] = frame0_nano;
m["fpga_count"] = 0; // XXX FIXME XXX
m["network_thread_waiting_usec"] = network_thread_waiting_usec;
m["network_thread_working_usec"] = network_thread_working_usec;
Expand Down Expand Up @@ -652,9 +653,12 @@ uint64_t intensity_network_stream::get_first_fpga_count(int beam) {
// Which of my assemblers (if any) is handling the requested beam?
int nbeams = this->ini_params.beam_ids.size();
for (int i=0; i<nbeams; i++)
if (this->ini_params.beam_ids[i] == beam)
if (this->ini_params.beam_ids[i] == beam) {
if (!this->assemblers[i]->first_packet_received)
throw runtime_error("ch_frb_io: get_first_fpga_count called, but first packet has not been received yet.");
return this->assemblers[i]->first_fpgacount;
return 0;
}
throw runtime_error("ch_frb_io internal error: beam_id not found in intensity_network_stream::get_first_fpga_count()");
}

void intensity_network_stream::get_max_fpga_count_seen(vector<uint64_t> &flushed,
Expand Down Expand Up @@ -716,6 +720,8 @@ void intensity_network_stream::network_thread_main()
// We use try..catch to ensure that _network_thread_exit() always gets called, even if an exception is thrown.
// We also print the exception so that it doesn't get "swallowed".

chime_log_set_thread_name("Network-" + std::to_string(ini_params.stream_id));

try {
_network_thread_body(); // calls pin_thread_to_cores()
} catch (exception &e) {
Expand Down Expand Up @@ -1004,6 +1010,8 @@ void intensity_network_stream::assembler_thread_main() {
// We use try..catch to ensure that _assembler_thread_exit() always gets called, even if an exception is thrown.
// We also print the exception so that it doesn't get "swallowed".

chime_log_set_thread_name("Assembler-" + std::to_string(ini_params.stream_id));

try {
_assembler_thread_body(); // calls pin_thread_to_cores()
} catch (exception &e) {
Expand Down Expand Up @@ -1246,13 +1254,13 @@ void intensity_network_stream::_fetch_frame0() {
curl_easy_cleanup(curl_handle);

string frame0_txt = holder.thestring;
chlog("Received frame0 text: " << frame0_txt);
//chlog("Received frame0 text: " << frame0_txt);
Json::Reader frame0_reader;
Json::Value frame0_json;
if (!frame0_reader.parse(frame0_txt, frame0_json))
throw runtime_error("ch_frb_io: failed to parse 'frame0' string: '" + frame0_txt + "'");

chlog("Parsed: " << frame0_json);
//chlog("Parsed: " << frame0_json);
if (!frame0_json.isObject())
throw runtime_error("ch_frb_io: 'frame0' was not a JSON 'Object' as expected");

Expand Down
73 changes: 73 additions & 0 deletions msgpack_binary_vector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#ifndef _MSGPACK_BINARY_VECTOR_HPP
#define _MSGPACK_BINARY_VECTOR_HPP

#include <vector>
#include <iostream>

#include <msgpack.hpp>

namespace ch_frb_io {

template<typename T>
class msgpack_binary_vector : public std::vector<T>
{};

}

namespace msgpack {
MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) {
namespace adaptor {

template<typename T>
struct convert<ch_frb_io::msgpack_binary_vector<T> > {
msgpack::object const& operator()(msgpack::object const& o,
ch_frb_io::msgpack_binary_vector<T>& v) const {
//std::cout << "msgpack_binary_vector: type " << o.type << std::endl;
if (o.type != msgpack::type::ARRAY)
throw std::runtime_error("msgpack_binary_vector: expected type ARRAY");
// Make sure array is big enough to check version
//std::cout << "msgpack_binary_vector: array size " << o.via.array.size << std::endl;
if (o.via.array.size != 3)
throw std::runtime_error("msgpack_binary_vector: expected array size 3");
msgpack::object* arr = o.via.array.ptr;
uint8_t version = arr[0].as<uint8_t>();
//std::cout << "version " << version << std::endl;
if (version != 1)
throw std::runtime_error("msgpack_binary_vector: expected version=1");
size_t n = arr[1].as<size_t>();
//std::cout << "msgpack_binary_vector: vector size " << n << std::endl; //", type " << arr[2].type << std::endl;
v.resize(n);
if (arr[2].type != msgpack::type::BIN)
throw msgpack::type_error();
//std::cout << "binary size " << arr[2].via.bin.size << " vs " << n << " x " <<
//sizeof(T) << " = " << (n * sizeof(T)) << std::endl;
if (arr[2].via.bin.size != n * sizeof(T))
throw msgpack::type_error();
memcpy(reinterpret_cast<void*>(v.data()), arr[2].via.bin.ptr, n * sizeof(T));
//std::cout << "msgpack_binary_vector: returned vector size " << v.size() << std::endl;
return o;
}
};

template<typename T>
struct pack<ch_frb_io::msgpack_binary_vector<T> > {
template <typename Stream>
packer<Stream>& operator()(msgpack::packer<Stream>& o, ch_frb_io::msgpack_binary_vector<T> const& v) const {
uint8_t version = 1;
o.pack_array(3);
o.pack(version);
o.pack(v.size());
o.pack_bin(v.size() * sizeof(T));
o.pack_bin_body(reinterpret_cast<const char*>(v.data()));
return o;
}
};

} // namespace adaptor
} // MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS)
} // namespace msgpack

#endif