Skip to content

Commit

Permalink
Merge PR snabbco#43 (Merge Snabb v2018.06 “Dill”) into master
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Jul 25, 2018
2 parents c2dd7ea + bd40a1e commit 203971b
Show file tree
Hide file tree
Showing 15 changed files with 1,325 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2018.04
2018.06
2 changes: 1 addition & 1 deletion lib/ljsyscall/syscall/linux/c.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ else -- 64 bit
function C.fstatfs(fd, buf) return syscall(sys.fstatfs, int(fd), void(buf)) end
function C.preadv(fd, iov, iovcnt, offset) return syscall_long(sys.preadv, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.pwritev(fd, iov, iovcnt, offset) return syscall_long(sys.pwritev, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), ulong(offset), int(whence)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), long(offset), int(whence)) end
function C.sendfile(outfd, infd, offset, count)
return syscall_long(sys.sendfile, int(outfd), int(infd), void(offset), ulong(count))
end
Expand Down
31 changes: 20 additions & 11 deletions src/apps/interlink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ of packets between Snabb processes within the same process group (see
[Multiprocess operation (core.worker)](#multiprocess-operation-core.worker)).

DIAGRAM: Transmitter and Receiver
+----------------------------+ +-------------------------+
| | | |
| | | |
input----* apps.interlink.transmitter | | apps.interlink.reciever *---- output
| | | |
| | | |
+----------------------------+ +-------------------------+
+-------------+ +-------------+
| | | |
input | | | |
----* Transmitter | | Reciever *----
| | | | output
| | | |
+-------------+ +-------------+

To make packets from an output port available to other processes, configure a
transmitter app, and link the appropriate output port to its `input` port.

```lua
local Transmitter = require("apps.interlink.transmitter)
local Transmitter = require("apps.interlink.transmitter")

config.app(c, "interlink", Transmitter)
config.link(c, "myapp.output -> interlink.input")
Expand All @@ -27,7 +27,7 @@ Then, in the process that should receive the packets, configure a receiver app
with the same name, and link its `output` port as suitable.

```lua
local Receiver = require("apps.interlink.receiver)
local Receiver = require("apps.interlink.receiver")

config.app(c, "interlink", Receiver)
config.link(c, "interlink.output -> otherapp.input")
Expand All @@ -36,10 +36,19 @@ config.link(c, "interlink.output -> otherapp.input")
Subsequently, packets transmitted to the transmitter’s `input` port will appear
on the receiver’s `output` port.

Alternatively, a name can be supplied as a configuration argument to be used
instead of the app’s name:

```lua
config.app(c, "mylink", Receiver, "interlink")
config.link(c, "mylink.output -> otherapp.input")
```

## Configuration

None, but the configured app names are globally unique within the process
group.
The configured app names denote globally unique queues within the process
group. Alternativelyy, the receiver and transmitter apps can instead be passed
a string that names the shared queue to which to attach to.

Starting either the transmitter or receiver app attaches them to a shared
packet queue visible to the process group under the name that was given to the
Expand Down
32 changes: 20 additions & 12 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}

function Receiver:new (_, name)
function Receiver:new (queue)
packet.enable_group_freelist()
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/receiver/"..name..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Receiver})
return setmetatable({attached=false, queue=queue}, {__index=Receiver})
end

function Receiver:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/receiver/"..queue..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Receiver:pull ()
Expand All @@ -28,18 +34,20 @@ function Receiver:pull ()
end

function Receiver:stop ()
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
if self.attached then
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach receivers to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Receiver.shutdown (pid)
for _, name in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..name..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..name..".interlink"
for _, queue in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_receiver(r, shm_name) end
Expand Down
32 changes: 20 additions & 12 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}

function Transmitter:new (_, name)
function Transmitter:new (queue)
packet.enable_group_freelist()
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/transmitter/"..name..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Transmitter})
return setmetatable({attached=false, queue=queue}, {__index=Transmitter})
end

function Transmitter:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/transmitter/"..queue..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Transmitter:push ()
Expand All @@ -28,18 +34,20 @@ function Transmitter:push ()
end

function Transmitter:stop ()
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
if self.attached then
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach transmitters to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Transmitter.shutdown (pid)
for _, name in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..name..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..name..".interlink"
for _, queue in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_transmitter(r, shm_name) end
Expand Down
46 changes: 44 additions & 2 deletions src/apps/pcap/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# PcapReader and PcapWriter Apps (apps.pcap.pcap)
# Pcap Savefile Apps

## PcapReader and PcapWriter Apps (apps.pcap.pcap)

The `PcapReader` and `PcapWriter` apps can be used to inject and log raw
packet data into and out of the app network using the
Expand All @@ -14,10 +16,50 @@ port to a PCAP file.
| | | |
+------------+ +------------+

## Configuration
### Configuration

Both `PcapReader` and `PcapWriter` expect a filename string as their
configuration arguments to read from and write to respectively. `PcapWriter`
will alternatively accept an array as its configuration argument, with the
first element being the filename and the second element being a *mode* argument
to `io.open`.

## Tap (apps.pcap.tap)

The `Tap` app is a simple in-band packet tap that writes packets that it
sees to a pcap savefile. It can optionally only write packets that pass
a pcap filter, and optionally subsample so it can write only every /n/th
packet.

DIAGRAM: pcaptap
+-------------------+
input | | output
---->* apps.pcap.tap.Tap *---->
| |
+-------------------+

### Configuration

The `Tap` app accepts a table as its configuration argument. The
following keys are defined:

— Key **filename**

*Required*. The name of the file to which to write the packets.

— Key **mode**

*Optional*. Either `"truncate"` or `"append"`, indicating whether the
savefile will be truncated (the default) or appended to.

— Key **filter**

*Optional*. A pflang filter expression to select packets for tapping.
Only packets that pass this filter will be sampled for the packet tap.

— Key **sample**

*Optional*. A sampling period. Defaults to 1, indicating that every
packet seen by the tap and passing the optional filter string will be
written. Setting this value to 2 will capture every second packet, and
so on.
87 changes: 87 additions & 0 deletions src/apps/pcap/tap.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local ffi = require("ffi")

local app = require("core.app")
local lib = require("core.lib")
local link = require("core.link")
local pcap = require("lib.pcap.pcap")
local pf = require("pf")

Tap = {}

local tap_config_params = {
-- Name of file to which to write packets.
filename = { required=true },
-- "truncate" to truncate the file, or "append" to add to the file.
mode = { default = "truncate" },
-- Only packets that match this pflang filter will be captured.
filter = { },
-- Only write every Nth packet that matches the filter.
sample = { default=1 },
}

function Tap:new(conf)
local o = lib.parse(conf, tap_config_params)
local mode = assert(({truncate='w+b', append='a+b'})[o.mode])
o.file = assert(io.open(o.filename, mode))
if o.file:seek() == 0 then pcap.write_file_header(o.file) end
if o.filter then o.filter = pf.compile_filter(o.filter) end
o.n = o.sample - 1
return setmetatable(o, {__index = Tap})
end

function Tap:push ()
local n = self.n
while not link.empty(self.input.input) do
local p = link.receive(self.input.input)
if not self.filter or self.filter(p.data, p.length) then
n = n + 1
if n == self.sample then
n = 0
pcap.write_record(self.file, p.data, p.length)
end
end
link.transmit(self.output.output, p)
end
self.n = n
end

function selftest ()
print('selftest: apps.pcap.tap')

local config = require("core.config")
local Sink = require("apps.basic.basic_apps").Sink
local PcapReader = require("apps.pcap.pcap").PcapReader

local function run(filter, sample)
local tmp = os.tmpname()
local c = config.new()
-- Re-use example from packet filter test.
config.app(c, "source", PcapReader, "apps/packet_filter/samples/v6.pcap")
config.app(c, "tap", Tap, {filename=tmp, filter=filter, sample=sample})
config.app(c, "sink", Sink )

config.link(c, "source.output -> tap.input")
config.link(c, "tap.output -> sink.input")
app.configure(c)
while not app.app_table.source.done do app.breathe() end

local n = 0
for packet, record in pcap.records(tmp) do n = n + 1 end
os.remove(tmp)

app.configure(config.new())

return n
end

assert(run() == 161)
assert(run("icmp6") == 49)
assert(run(nil, 2) == 81)
assert(run("icmp6", 2) == 25)

print('selftest: ok')
end
Loading

0 comments on commit 203971b

Please sign in to comment.