Skip to content

Commit

Permalink
Backport ponylang/ponyc#1578 and ponylang/ponyc#1626 to net actors
Browse files Browse the repository at this point in the history
Backport ponylang/ponyc#1578 and ponylang/ponyc#1626 to net actors
because the help resolve edge cases around the networking code with
direct impacts to the stability of the network tests.
  • Loading branch information
dipinhora authored and JONBRWN committed Oct 12, 2017
1 parent 6d45d97 commit a73d638
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 6 deletions.
4 changes: 4 additions & 0 deletions lib/wallaroo/core/boundary/boundary.pony
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ actor OutgoingBoundary is Consumer
_event = event
_connected = true
_writeable = true
_readable = true

_notify.connected(this)
_on_connected()
_pending_reads()

ifdef not windows then
if _pending_writes() then
Expand Down Expand Up @@ -485,6 +487,7 @@ actor OutgoingBoundary is Consumer

_connected = true
_writeable = true
_readable = true

// set replaying to true since we might need to replay to
// downstream before resuming
Expand All @@ -495,6 +498,7 @@ actor OutgoingBoundary is Consumer
_shutdown_peer = false

_notify.connected(this)
_pending_reads()

try
let connect_msg = ChannelMsgEncoder.data_connect(_worker_name,
Expand Down
5 changes: 5 additions & 0 deletions lib/wallaroo/core/data_channel/data_channel.pony
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ actor DataChannel
_max_size = max_size

_notify.accepted(this)

_readable = true
_queue_read()
_pending_reads()

be identify_data_receiver(dr: DataReceiver, sender_step_id: StepId) =>
"""
Expand Down Expand Up @@ -349,9 +352,11 @@ actor DataChannel
_event = event
_connected = true
_writeable = true
_readable = true

_notify.connected(this)
_queue_read()
_pending_reads()

// Don't call _complete_writes, as Windows will see this as a
// closed connection.
Expand Down
6 changes: 3 additions & 3 deletions lib/wallaroo/core/data_channel/data_channel_listener.pony
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ actor DataChannelListener
_closed = true

if not _event.is_null() then
@pony_os_socket_close[None](_fd)
_fd = -1

// When not on windows, the unsubscribe is done immediately.
ifdef not windows then
@pony_asio_event_unsubscribe(_event)
end

@pony_os_socket_close[None](_fd)
_fd = -1

_notify.closed(this)
end
4 changes: 4 additions & 0 deletions lib/wallaroo/core/metrics/reconnecting_metrics_sink.pony
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,11 @@ actor ReconnectingMetricsSink
_event = event
_connected = true
_writeable = true
_readable = true

_notify.connected(this)
_queue_read()
_pending_reads()

ifdef not windows then
if _pending_writes() then
Expand All @@ -373,12 +375,14 @@ actor ReconnectingMetricsSink

_connected = true
_writeable = true
_readable = true

_closed = false
_shutdown = false
_shutdown_peer = false

_notify.connected(this)
_pending_reads()

ifdef not windows then
if _pending_writes() then
Expand Down
4 changes: 4 additions & 0 deletions lib/wallaroo/core/sink/tcp_sink/tcp_sink.pony
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ actor TCPSink is Consumer
_event = event
_connected = true
_writeable = true
_readable = true

match _initializer
| let initializer: LocalTopologyInitializer =>
Expand All @@ -327,6 +328,7 @@ actor TCPSink is Consumer
end

_notify.connected(this)
_pending_reads()

for msg in _initial_msgs.values() do
_writev(msg, None)
Expand Down Expand Up @@ -357,12 +359,14 @@ actor TCPSink is Consumer

_connected = true
_writeable = true
_readable = true

_closed = false
_shutdown = false
_shutdown_peer = false

_notify.connected(this)
_pending_reads()

ifdef not windows then
if _pending_writes() then
Expand Down
6 changes: 6 additions & 0 deletions lib/wallaroo/core/source/tcp_source/tcp_source.pony
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ actor TCPSource is Producer
//listening until we are done recovering
_notify.accepted(this)

_readable = true
_pending_reads()

for consumer in routes.values() do
_routes(consumer) =
_route_builder(this, consumer, _metrics_reporter)
Expand Down Expand Up @@ -274,8 +277,11 @@ actor TCPSource is Producer
_fd = fd
_event = event
_connected = true
_readable = true

_notify.connected(this)

_pending_reads()
else
// The connection failed, unsubscribe the event and close.
@pony_asio_event_unsubscribe(event)
Expand Down
6 changes: 3 additions & 3 deletions lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ actor TCPSourceListener is SourceListener
_closed = true

if not _event.is_null() then
@pony_os_socket_close[None](_fd)
_fd = -1

// When not on windows, the unsubscribe is done immediately.
ifdef not windows then
@pony_asio_event_unsubscribe(_event)
end

@pony_os_socket_close[None](_fd)
_fd = -1
end

0 comments on commit a73d638

Please sign in to comment.