Skip to content

Commit

Permalink
packet_pool: adopt the new PacketPool interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sangjinhan committed Aug 9, 2018
1 parent 727d1c2 commit d6cc92c
Show file tree
Hide file tree
Showing 21 changed files with 183 additions and 462 deletions.
4 changes: 2 additions & 2 deletions core/bessctl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
#include "module.h"
#include "module_graph.h"
#include "opts.h"
#include "packet.h"
#include "packet_pool.h"
#include "port.h"
#include "resume_hook.h"
#include "scheduler.h"
Expand Down Expand Up @@ -1402,7 +1402,7 @@ class BESSControlImpl final : public BESSControl::Service {
(socket_filter == -1) ? (RTE_MAX_NUMA_NODES - 1) : socket_filter;
int socket = (request->socket() == -1) ? 0 : socket_filter;
for (; socket <= socket_filter; socket++) {
struct rte_mempool* mempool = bess::get_pframe_pool_socket(socket);
rte_mempool* mempool = bess::PacketPool::GetDefaultPool(socket)->pool();
MempoolDump* dump = response->add_dumps();
dump->set_socket(socket);
dump->set_initialized(mempool != nullptr);
Expand Down
75 changes: 41 additions & 34 deletions core/dpdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@
#include <cstring>
#include <string>

#include "utils/time.h"
#include "memory.h"
#include "worker.h"

static int get_numa_count() {
namespace bess {
namespace {

int get_numa_count() {
FILE *fp;

int matched;
Expand All @@ -74,23 +77,23 @@ static int get_numa_count() {
return 1;
}

static void disable_syslog() {
void disable_syslog() {
setlogmask(0x01);
}

static void enable_syslog() {
void enable_syslog() {
setlogmask(0xff);
}

/* for log messages during rte_eal_init() */
static ssize_t dpdk_log_init_writer(void *, const char *data, size_t len) {
// for log messages during rte_eal_init()
ssize_t dpdk_log_init_writer(void *, const char *data, size_t len) {
enable_syslog();
LOG(INFO) << std::string(data, len);
disable_syslog();
return len;
}

static ssize_t dpdk_log_writer(void *, const char *data, size_t len) {
ssize_t dpdk_log_writer(void *, const char *data, size_t len) {
LOG(INFO) << std::string(data, len);
return len;
}
Expand Down Expand Up @@ -121,25 +124,26 @@ class CmdLineOpts {
std::vector<char *> argv_;
};

static void init_eal(const char *prog_name, int mb_per_socket,
int multi_instance, bool no_huge, int default_core) {
void init_eal(int dpdk_mb_per_socket, int default_core) {
int numa_count = get_numa_count();

CmdLineOpts rte_args{
prog_name, "--master-lcore", std::to_string(RTE_MAX_LCORE - 1), "--lcore",
"bessd",
"--master-lcore",
std::to_string(RTE_MAX_LCORE - 1),
"--lcore",
std::to_string(RTE_MAX_LCORE - 1) + "@" + std::to_string(default_core),
// Do not bother with /var/run/.rte_config and .rte_hugepage_info,
// since we don't want to interfere with other DPDK applications.
"--no-shconf",
};

if (no_huge) {
if (dpdk_mb_per_socket <= 0) {
rte_args.Append({"--no-huge"});
rte_args.Append({"-m", std::to_string(mb_per_socket * numa_count)});
} else {
std::string opt_socket_mem = std::to_string(mb_per_socket);
std::string opt_socket_mem = std::to_string(dpdk_mb_per_socket);
for (int i = 1; i < numa_count; i++) {
opt_socket_mem += "," + std::to_string(mb_per_socket);
opt_socket_mem += "," + std::to_string(dpdk_mb_per_socket);
}

rte_args.Append({"--socket-mem", opt_socket_mem});
Expand All @@ -149,15 +153,11 @@ static void init_eal(const char *prog_name, int mb_per_socket,
rte_args.Append({"--huge-unlink"});
}

if (!no_huge && multi_instance) {
rte_args.Append({"--file-prefix", "rte" + std::to_string(getpid())});
}

/* reset getopt() */
// reset getopt()
optind = 0;

/* DPDK creates duplicated outputs (stdout and syslog).
* We temporarily disable syslog, then set our log handler */
// DPDK creates duplicated outputs (stdout and syslog).
// We temporarily disable syslog, then set our log handler
cookie_io_functions_t dpdk_log_init_funcs;
cookie_io_functions_t dpdk_log_funcs;

Expand All @@ -173,8 +173,9 @@ static void init_eal(const char *prog_name, int mb_per_socket,
disable_syslog();
int ret = rte_eal_init(rte_args.Argc(), rte_args.Argv());
if (ret < 0) {
LOG(ERROR) << "rte_eal_init() failed: ret = " << ret;
exit(EXIT_FAILURE);
LOG(FATAL) << "rte_eal_init() failed: ret = " << ret
<< " rte_errno = " << rte_errno << " ("
<< rte_strerror(rte_errno) << ")";
}

enable_syslog();
Expand All @@ -187,7 +188,7 @@ static void init_eal(const char *prog_name, int mb_per_socket,
// Returns the last core ID of all cores, as the default core all threads will
// run on. If the process was run with a limited set of cores (by `taskset`),
// the last one among them will be picked.
static int determine_default_core() {
int determine_default_core() {
cpu_set_t set;

int ret = pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
Expand All @@ -208,16 +209,22 @@ static int determine_default_core() {
return 0;
}

void init_dpdk(const ::std::string &prog_name, int mb_per_socket,
int multi_instance, bool no_huge) {
// Isolate all background threads in a separate core.
// All non-worker threads will be scheduled on default_core,
// including threads spawned by DPDK and gRPC.
// FIXME: This is a temporary fix. If a new worker thread is allocated on the
// same core, background threads should migrate to another core.
int default_core = determine_default_core();
bool is_initialized = false;

} // namespace

bool IsDpdkInitialized() {
return is_initialized;
}

void InitDpdk(int dpdk_mb_per_socket) {
current_worker.SetNonWorker();

init_eal(prog_name.c_str(), mb_per_socket, multi_instance, no_huge,
default_core);
if (!is_initialized) {
is_initialized = true;
LOG(INFO) << "Initializing DPDK";
init_eal(dpdk_mb_per_socket, determine_default_core());
}
}

} // namespace bess
27 changes: 6 additions & 21 deletions core/dpdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,14 @@
#ifndef BESS_DPDK_H_
#define BESS_DPDK_H_

/* rte_version.h depends on this but has no #include :( */
#include <cstdio>
#include <string>
namespace bess {

#include <rte_version.h>
bool IsDpdkInitialized();

#define DPDK_VER_NUM(a, b, c) (((a << 16) | (b << 8) | (c)))
// Initialize DPDK, with the specified amount of hugepage memory.
// Safe to call multiple times.
void InitDpdk(int dpdk_mb_per_socket = 0);

/* for DPDK 16.04 or newer */
#ifdef RTE_VER_YEAR
#define DPDK_VER DPDK_VER_NUM(RTE_VER_YEAR, RTE_VER_MONTH, RTE_VER_MINOR)
#endif

/* for DPDK 2.2 or older */
#ifdef RTE_VER_MAJOR
#define DPDK_VER DPDK_VER_NUM(RTE_VER_MAJOR, RTE_VER_MINOR, RTE_VER_PATCH_LEVEL)
#endif

#ifndef DPDK_VER
#error DPDK version is not available
#endif

void init_dpdk(const ::std::string &prog_name, int mb_per_socket,
int multi_instance, bool no_huge);
} // namespace bess

#endif // BESS_DPDK_H_
17 changes: 8 additions & 9 deletions core/drivers/pcap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ int PCAPPort::RecvPackets(queue_t qid, bess::Packet** pkts, int cnt) {
}

int recv_cnt = 0;
bess::Packet* sbuf;

DCHECK_EQ(qid, 0);

Expand All @@ -75,21 +74,21 @@ int PCAPPort::RecvPackets(queue_t qid, bess::Packet** pkts, int cnt) {
break;
}

sbuf = bess::Packet::Alloc();
if (!sbuf) {
bess::Packet* pkt = current_worker.packet_pool()->Alloc();
if (!pkt) {
break;
}

int copy_len = std::min(caplen, static_cast<int>(sbuf->tailroom()));
bess::utils::CopyInlined(sbuf->append(copy_len), packet, copy_len, true);
int copy_len = std::min(caplen, static_cast<int>(pkt->tailroom()));
bess::utils::CopyInlined(pkt->append(copy_len), packet, copy_len, true);

packet += copy_len;
caplen -= copy_len;
bess::Packet* m = sbuf;
bess::Packet* m = pkt;

int nb_segs = 1;
while (caplen > 0) {
m->set_next(bess::Packet::Alloc());
m->set_next(current_worker.packet_pool()->Alloc());
m = m->next();
nb_segs++;

Expand All @@ -99,8 +98,8 @@ int PCAPPort::RecvPackets(queue_t qid, bess::Packet** pkts, int cnt) {
packet += copy_len;
caplen -= copy_len;
}
sbuf->set_nb_segs(nb_segs);
pkts[recv_cnt] = sbuf;
pkt->set_nb_segs(nb_segs);
pkts[recv_cnt] = pkt;
recv_cnt++;
}

Expand Down
6 changes: 3 additions & 3 deletions core/drivers/pmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ CommandResponse PMDPort::Init(const bess::pb::PMDPortArg &arg) {
sid = 0;
}

ret =
rte_eth_rx_queue_setup(ret_port_id, i, queue_size[PACKET_DIR_INC], sid,
&eth_rxconf, bess::get_pframe_pool_socket(sid));
ret = rte_eth_rx_queue_setup(ret_port_id, i, queue_size[PACKET_DIR_INC],
sid, &eth_rxconf,
bess::PacketPool::GetDefaultPool(sid)->pool());
if (ret != 0) {
return CommandFailure(-ret, "rte_eth_rx_queue_setup() failed");
}
Expand Down
2 changes: 1 addition & 1 deletion core/drivers/unix_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {

int received = 0;
while (received < cnt) {
bess::Packet *pkt = static_cast<bess::Packet *>(bess::Packet::Alloc());
bess::Packet *pkt = current_worker.packet_pool()->Alloc();

if (!pkt) {
break;
Expand Down
9 changes: 5 additions & 4 deletions core/drivers/vport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ static void refill_tx_bufs(struct llring *r) {

deficit = REFILL_HIGH - curr_cnt;

ret = bess::Packet::Alloc((bess::Packet **)pkts, deficit, 0);
if (ret == 0)
if (!current_worker.packet_pool()->AllocBulk(pkts, deficit, 0)) {
return;
}

for (int i = 0; i < ret; i++)
for (int i = 0; i < deficit; i++) {
objs[i] = pkts[i]->paddr();
}

ret = llring_mp_enqueue_bulk(r, objs, ret);
ret = llring_mp_enqueue_bulk(r, objs, deficit);
DCHECK_EQ(ret, 0);
}

Expand Down
9 changes: 2 additions & 7 deletions core/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
#include "bessctl.h"
#include "bessd.h"
#include "debug.h"
#include "dpdk.h"
#include "opts.h"
#include "packet.h"
#include "packet_pool.h"
#include "port.h"
#include "utils/format.h"
#include "version.h"
Expand Down Expand Up @@ -77,10 +76,7 @@ int main(int argc, char *argv[]) {
<< FLAGS_modules;
}

// TODO(barath): Make these DPDK calls generic, so as to not be so tied to
// DPDK.
init_dpdk(argv[0], FLAGS_m, FLAGS_a, FLAGS_no_huge);
bess::init_mempool();
bess::PacketPool::CreateDefaultPools();

PortBuilder::InitDrivers();

Expand All @@ -106,7 +102,6 @@ int main(int argc, char *argv[]) {
}

rte_eal_mp_wait_lcore();
bess::close_mempool();

LOG(INFO) << "BESS daemon has been gracefully shut down";

Expand Down
2 changes: 1 addition & 1 deletion core/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#include "gate.h"
#include "message.h"
#include "metadata.h"
#include "packet.h"
#include "packet_pool.h"

using bess::gate_idx_t;

Expand Down
2 changes: 1 addition & 1 deletion core/modules/flowgen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ bess::Packet *FlowGen::FillPacket(struct flow *f) {

int size = template_size_;

if (!(pkt = bess::Packet::Alloc())) {
if (!(pkt = current_worker.packet_pool()->Alloc())) {
return nullptr;
}

Expand Down
22 changes: 11 additions & 11 deletions core/modules/source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,22 @@ CommandResponse Source::CommandSetPktSize(
struct task_result Source::RunTask(Context *ctx, bess::PacketBatch *batch,
void *) {
if (children_overload_ > 0) {
return {
.block = true, .packets = 0, .bits = 0,
};
return {.block = true, .packets = 0, .bits = 0};
}

const int pkt_overhead = 24;
const int pkt_size = ACCESS_ONCE(pkt_size_);
const int burst = ACCESS_ONCE(burst_);

uint32_t cnt = bess::Packet::Alloc(batch->pkts(), burst, pkt_size);
batch->set_cnt(cnt);
RunNextModule(ctx, batch); // it's fine to call this function with cnt==0
const uint32_t burst = ACCESS_ONCE(burst_);

if (current_worker.packet_pool()->AllocBulk(batch->pkts(), burst, pkt_size)) {
batch->set_cnt(burst);
RunNextModule(ctx, batch); // it's fine to call this function with cnt==0
return {.block = false,
.packets = burst,
.bits = (pkt_size + pkt_overhead) * burst * 8};
}

return {.block = (cnt == 0),
.packets = cnt,
.bits = (pkt_size + pkt_overhead) * cnt * 8};
return {.block = true, .packets = 0, .bits = 0};
}

ADD_MODULE(Source, "source",
Expand Down
4 changes: 2 additions & 2 deletions core/modules/url_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ inline static bess::Packet *Generate403Packet(const Ethernet::Address &src_eth,
be32_t src_ip, be32_t dst_ip,
be16_t src_port, be16_t dst_port,
be32_t seq, be32_t ack) {
bess::Packet *pkt = bess::Packet::Alloc();
bess::Packet *pkt = current_worker.packet_pool()->Alloc();
char *ptr = static_cast<char *>(pkt->buffer()) + SNBUF_HEADROOM;
pkt->set_data_off(SNBUF_HEADROOM);

Expand Down Expand Up @@ -143,7 +143,7 @@ inline static bess::Packet *GenerateResetPacket(
const Ethernet::Address &src_eth, const Ethernet::Address &dst_eth,
be32_t src_ip, be32_t dst_ip, be16_t src_port, be16_t dst_port, be32_t seq,
be32_t ack) {
bess::Packet *pkt = bess::Packet::Alloc();
bess::Packet *pkt = current_worker.packet_pool()->Alloc();
char *ptr = static_cast<char *>(pkt->buffer()) + SNBUF_HEADROOM;
pkt->set_data_off(SNBUF_HEADROOM);
pkt->set_total_len(sizeof(rst_template));
Expand Down
Loading

0 comments on commit d6cc92c

Please sign in to comment.