Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max's async Snabb changes + public packet_t #997

Merged
merged 10 commits into from
Aug 26, 2016
10 changes: 10 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ Predicate used to test if a link is full. Returns true if *link* is full
and false otherwise.


— Function **link.nreadable** *link*

Returns the number of packets on *link*.


— Function **link.nwriteable** *link*

Returns the remaining number of packets that fit onto *link*.


— Function **link.receive** *link*

Returns the next available packet (and advances the read cursor) on
Expand Down
19 changes: 7 additions & 12 deletions src/apps/basic/basic_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ end

function Source:pull ()
for _, o in ipairs(self.output) do
for i = 1, link.nwritable(o) do
for i = 1, engine.pull_npackets do
transmit(o, packet.clone(self.packet))
end
end
Expand All @@ -41,7 +41,7 @@ end

function Join:push ()
for _, inport in ipairs(self.input) do
for n = 1,math.min(link.nreadable(inport), link.nwritable(self.output.out)) do
while not link.empty(inport) do
transmit(self.output.out, receive(inport))
end
end
Expand All @@ -60,7 +60,7 @@ end
function Split:push ()
for _, i in ipairs(self.input) do
for _, o in ipairs(self.output) do
for _ = 1, math.min(link.nreadable(i), link.nwritable(o)) do
for _ = 1, link.nreadable(i) do
transmit(o, receive(i))
end
end
Expand Down Expand Up @@ -93,16 +93,11 @@ function Tee:new ()
end

function Tee:push ()
noutputs = #self.output
local noutputs = #self.output
if noutputs > 0 then
local maxoutput = link.max
for _, o in ipairs(self.output) do
maxoutput = math.min(maxoutput, link.nwritable(o))
end
for _, i in ipairs(self.input) do
for _ = 1, math.min(link.nreadable(i), maxoutput) do
for _ = 1, link.nreadable(i) do
local p = receive(i)
maxoutput = maxoutput - 1
do local output = self.output
for k = 1, #output do
transmit(output[k], k == #output and p or packet.clone(p))
Expand All @@ -122,15 +117,15 @@ function Repeater:new ()
{__index=Repeater})
end

function Repeater:push ()
function Repeater:pull ()
local i, o = self.input.input, self.output.output
for _ = 1, link.nreadable(i) do
local p = receive(i)
table.insert(self.packets, p)
end
local npackets = #self.packets
if npackets > 0 then
for i = 1, link.nwritable(o) do
for i = 1, engine.pull_npackets do
assert(self.packets[self.index])
transmit(o, packet.clone(self.packets[self.index]))
self.index = (self.index % npackets) + 1
Expand Down
9 changes: 2 additions & 7 deletions src/apps/intel/intel1g.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ function Intel1g:new(conf)
local txq = conf.txqueue or 0
local rxq = conf.rxqueue or 0
local ndesc = conf.ndescriptors or 512
local rxburst = conf.rxburst or 128
local rxburst = conf.rxburst or engine.pull_npackets

-- 8.1.3 Register Summary, p.359
local r = {}
Expand Down Expand Up @@ -589,12 +589,7 @@ function Intel1g:new(conf)
while limit > 0 and can_receive() do
limit = limit - 1
if lo then -- a link connects NIC to a sink
if not link.full(lo) then -- from SolarFlareNic:pull()
link.transmit(lo, receive())
else
counters.pullTxLinkFull= counters.pullTxLinkFull +1
packet.free(receive())
end
link.transmit(lo, receive())
else
counters.pullNoTxLink= counters.pullNoTxLink +1
packet.free(receive())
Expand Down
6 changes: 3 additions & 3 deletions src/apps/intel/intel_app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local pci = require("lib.hardware.pci")
local register = require("lib.hardware.register")
local macaddress = require("lib.macaddress")
local intel10g = require("apps.intel.intel10g")
local receive, transmit, full, empty = link.receive, link.transmit, link.full, link.empty
local receive, transmit, empty = link.receive, link.transmit, link.empty
Intel82599 = {}
Intel82599.__index = Intel82599

Expand Down Expand Up @@ -133,8 +133,8 @@ function Intel82599:pull ()
local l = self.output.tx
if l == nil then return end
self.dev:sync_receive()
for i=1,128 do
if full(l) or not self.dev:can_receive() then break end
for i = 1, engine.pull_npackets do
if not self.dev:can_receive() then break end
transmit(l, self.dev:receive())
end
self:add_receive_buffers()
Expand Down
4 changes: 2 additions & 2 deletions src/apps/ipv6/nd_light.lua
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ function nd_light:push ()
local l_in = self.input.south
local l_out = self.output.north
local l_reply = self.output.south
while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
local p = cache.p
p[0] = link.receive(l_in)
local status = from_south(self, p)
Expand All @@ -327,7 +327,7 @@ function nd_light:push ()

l_in = self.input.north
l_out = self.output.south
while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
if not self._eth_header then
-- Drop packets until ND for the next-hop
-- has completed.
Expand Down
4 changes: 2 additions & 2 deletions src/apps/ipv6/ns_responder.lua
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ function ns_responder:push()
local l_in = self.input.north
local l_out = self.output.south
if l_in and l_out then
while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
-- Pass everything on north -> south
link.transmit(l_out, link.receive(l_in))
end
end
l_in = self.input.south
l_out = self.output.north
local l_reply = self.output.south
while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
local p = link.receive(l_in)
local status = process(self, p)
if status == nil then
Expand Down
4 changes: 2 additions & 2 deletions src/apps/keyed_ipv6_tunnel/tunnel.lua
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ function SimpleKeyedTunnel:push()
local l_out = self.output.encapsulated
assert(l_in and l_out)

while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
local p = link.receive(l_in)
packet.prepend(p, self.header, HEADER_SIZE)
local plength = ffi.cast(plength_ctype, p.data + LENGTH_OFFSET)
Expand All @@ -199,7 +199,7 @@ function SimpleKeyedTunnel:push()
l_in = self.input.encapsulated
l_out = self.output.decapsulated
assert(l_in and l_out)
while not link.empty(l_in) and not link.full(l_out) do
while not link.empty(l_in) do
local p = link.receive(l_in)
-- match next header, cookie, src/dst addresses
local drop = true
Expand Down
4 changes: 2 additions & 2 deletions src/apps/lwaftr/generator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ end
function from_inet:pull()
local o = assert(self.output.output)

while not link.full(o) do
for i=1,engine.pull_npackets do
if self.max_packets then
if self.tx_packets == self.max_packets then break end
self.tx_packets = self.tx_packets + 1
Expand Down Expand Up @@ -312,7 +312,7 @@ end
function from_b4:pull()
local o = assert(self.output.output)

while not link.full(o) do
for i=1,engine.pull_npackets do
if self.max_packets then
if self.tx_packets == self.max_packets then break end
self.tx_packets = self.tx_packets + 1
Expand Down
6 changes: 3 additions & 3 deletions src/apps/lwaftr/ipv4_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function Reassembler:push ()
local l2_size = self.l2_size
local ethertype_offset = self.ethertype_offset

for _=1,math.min(link.nreadable(input), link.nwritable(output)) do
for _=1,link.nreadable(input) do
local pkt = receive(input)
if is_ipv4(pkt, ethertype_offset) and is_fragment(pkt, l2_size) then
local frags = self:cache_fragment(pkt)
Expand Down Expand Up @@ -165,7 +165,7 @@ end
function ICMPEcho:push()
local l_in, l_out, l_reply = self.input.south, self.output.north, self.output.south

for _ = 1, math.min(link.nreadable(l_in), link.nwritable(l_out)) do
for _ = 1, link.nreadable(l_in) do
local out, pkt = l_out, receive(l_in)

if icmp.is_icmpv4_message(pkt, icmpv4_echo_request, 0) then
Expand Down Expand Up @@ -199,7 +199,7 @@ function ICMPEcho:push()
end

l_in, l_out = self.input.north, self.output.south
for _ = 1, math.min(link.nreadable(l_in), link.nwritable(l_out)) do
for _ = 1, link.nreadable(l_in) do
transmit(l_out, receive(l_in))
end
end
4 changes: 2 additions & 2 deletions src/apps/lwaftr/ipv6_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ end
function ICMPEcho:push()
local l_in, l_out, l_reply = self.input.south, self.output.north, self.output.south

for _ = 1, math.min(link.nreadable(l_in), link.nwritable(l_out)) do
for _ = 1, link.nreadable(l_in) do
local out, pkt = l_out, receive(l_in)

if icmp.is_icmpv6_message(pkt, icmpv6_echo_request, 0) then
Expand Down Expand Up @@ -279,7 +279,7 @@ function ICMPEcho:push()
end

l_in, l_out = self.input.north, self.output.south
for _ = 1, math.min(link.nreadable(l_in), link.nwritable(l_out)) do
for _ = 1, link.nreadable(l_in) do
transmit(l_out, receive(l_in))
end
end
4 changes: 2 additions & 2 deletions src/apps/lwaftr/loadgen.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function RateLimitedRepeater:set_rate (byte_rate)
self.rate = math.max(byte_rate, 0)
end

function RateLimitedRepeater:push ()
function RateLimitedRepeater:pull ()
local i, o = self.input.input, self.output.output
for _ = 1, link.nreadable(i) do
local p = receive(i)
Expand All @@ -50,7 +50,7 @@ function RateLimitedRepeater:push ()

local npackets = #self.packets
if npackets > 0 and self.rate > 0 then
for _ = 1, link.nwritable(o) do
for _ = 1, engine.pull_npackets do
local p = self.packets[self.index]
if p.length > self.bucket_content then break end
self.bucket_content = self.bucket_content - p.length
Expand Down
4 changes: 3 additions & 1 deletion src/apps/pcap/pcap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ end

function PcapReader:pull ()
assert(self.output.output)
while not self.done and not link.full(self.output.output) do
local limit = engine.pull_npackets
while limit > 0 and not self.done do
limit = limit - 1
local data, record, extra = self.iterator()
if data then
local p = packet.from_string(data)
Expand Down
2 changes: 1 addition & 1 deletion src/apps/rate_limiter/rate_limiter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function RateLimiter:push ()
end


while not link.empty(i) and not link.full(o) do
while not link.empty(i) do
local p = link.receive(i)
local length = p.length

Expand Down
4 changes: 3 additions & 1 deletion src/apps/socket/raw.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ end
function RawSocket:pull ()
local l = self.output.tx
if l == nil then return end
while not link.full(l) and self:can_receive() do
local limit = engine.pull_npackets
while limit > 0 and self:can_receive() do
limit = limit - 1
link.transmit(l, self:receive())
end
end
Expand Down
6 changes: 4 additions & 2 deletions src/apps/socket/unix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ function UnixSocket:new (arg)
function self:pull()
local l = self.output.tx
if l == nil then return end
while not link.full(l) and can_receive() do
local limit = engine.pull_npackets
while limit > 0 and can_receive() do
limit = limit - 1
local p = receive()
if p then
link.transmit(l, p) --link owns p now so we mustn't free it
Expand Down Expand Up @@ -197,7 +199,7 @@ function selftest ()
pull = function(self)
local l = self.output.tx
if l == nil then return end
while not link.full(l) do
for i=1,engine.pull_npackets do
local p = packet.allocate()
ffi.copy(p.data, text)
p.length = #text
Expand Down
14 changes: 7 additions & 7 deletions src/apps/solarflare/solarflare.lua
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,19 @@ function SolarFlareNic:pull()
self.stats.pull = (self.stats.pull or 0) + 1
repeat
local n_ev = self.poll_structure.n_ev
local pull_npackets = engine.pull_npackets
if n_ev > 0 then
for i = 0, n_ev - 1 do
local event_type = self.poll_structure.events[i].generic.type
if event_type == C.EF_EVENT_TYPE_RX then
if event_type == C.EF_EVENT_TYPE_RX and pull_npackets > 0 then
pull_npackets = pull_npackets - 1
local rxpacket = self.rxpackets[self.poll_structure.events[i].rx.rq_id]
rxpacket.length = self.poll_structure.events[i].rx.len
self.stats.rx = (self.stats.rx or 0) + 1
if not link.full(self.output.tx) then
link.transmit(self.output.tx, rxpacket)
else
self.stats.link_full = (self.stats.link_full or 0) + 1
packet.free(rxpacket)
end
link.transmit(self.output.tx, rxpacket)
self.enqueue_receive(self, self.poll_structure.events[i].rx.rq_id)
elseif event_type == C.EF_EVENT_TYPE_RX and pull_npackets == 0 then
self.stats.rxdrop = (self.stats.rxdrop or 0) + 1
self.enqueue_receive(self, self.poll_structure.events[i].rx.rq_id)
elseif event_type == C.EF_EVENT_TYPE_TX then
local n_tx_done = self.poll_structure.unbundled_tx_request_ids[i].n_tx_done
Expand Down
2 changes: 1 addition & 1 deletion src/apps/tap/tap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ end
function Tap:pull ()
local l = self.output.output
if l == nil then return end
while not link.full(l) do
for i=1,engine.pull_npackets do
local p = packet.allocate()
local len, err = S.read(self.sock, p.data, C.PACKET_PAYLOAD_SIZE)
-- errno == EAGAIN indicates that the read would of blocked as there is no
Expand Down
6 changes: 4 additions & 2 deletions src/apps/test/lwaftr.lua
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ function Lwaftrgen:new(arg)
return setmetatable(o, {__index=Lwaftrgen})
end

function Lwaftrgen:push ()
function Lwaftrgen:pull ()

local output = self.output.output
local input = self.input.input
Expand Down Expand Up @@ -330,8 +330,10 @@ function Lwaftrgen:push ()
self.bucket_content = self.bucket_content + self.rate * 1e6 * (cur_now - last_time)
self.last_time = cur_now

while link.nwritable(output) > self.total_packet_count and
local limit = engine.pull_npackets
while limit > self.total_packet_count and
self.total_packet_count <= self.bucket_content do
limit = limit - 1
self.bucket_content = self.bucket_content - self.total_packet_count

ipv4_hdr.dst_ip = self.b4_ipv4
Expand Down
2 changes: 1 addition & 1 deletion src/apps/test/synth.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ end

function Synth:pull ()
for _, o in ipairs(self.output) do
for i = 1, link.nwritable(o) do
for i = 1, engine.pull_npackets do
for _, p in ipairs(self.packets) do
transmit(o, packet.clone(p))
end
Expand Down
4 changes: 2 additions & 2 deletions src/apps/virtio_net/virtio_net.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ local main = require("core.main")
VirtioNet = {}
VirtioNet.__index = VirtioNet

local receive, transmit, full, nreadable, nwritable = link.receive, link.transmit, link.full, link.nreadable, link.nwritable
local receive, transmit, nreadable = link.receive, link.transmit, link.nreadable

function VirtioNet:new(args)
return setmetatable({
Expand Down Expand Up @@ -51,7 +51,7 @@ function VirtioNet:pull()
local dev = self.device
local l = self.output.tx
if not l then return end
local to_receive = math.min(nwritable(l), dev:can_receive())
local to_receive = math.min(engine.pull_npackets, dev:can_receive())

for i=0, to_receive - 1 do
transmit(l, dev:receive())
Expand Down
Loading