Skip to content

Commit

Permalink
Merge pull request #108 from OFFIS-DAI/feature-auto-port
Browse files Browse the repository at this point in the history
Feature auto port
  • Loading branch information
rcschrg authored Oct 20, 2024
2 parents b5dd4ab + b62fac2 commit ab26f41
Show file tree
Hide file tree
Showing 32 changed files with 275 additions and 160 deletions.
46 changes: 44 additions & 2 deletions .github/workflows/test-mango.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,54 @@ permissions:
contents: read

jobs:
build:
build-mac:
runs-on: macOS-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
cache-dependency-path: '**/setup.py'
- name: Install dependencies
run: |
pip install virtualenv
virtualenv venv
source venv/bin/activate
pip3 install -U sphinx
pip3 install -r docs/requirements.txt
pip3 install -r requirements.txt
pip3 install -e .
brew install mosquitto
brew services start mosquitto
pip3 install pytest coverage ruff
- name: Lint with ruff
run: |
# stop the build if there are Python syntax errors or undefined names
source venv/bin/activate
ruff check .
ruff format --check .
- name: Doctests
run: |
source venv/bin/activate
make -C docs doctest
- name: Test+Coverage
run: |
source venv/bin/activate
coverage run -m pytest
coverage report
build-linux:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
- name: Set up Python
Expand Down
9 changes: 5 additions & 4 deletions docs/source/agents-container.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ a simple but fast network protocol.
codec: Codec = None,
clock: Clock = None,
copy_internal_messages: bool = False,
auto_port=False,
**kwargs: dict[str, Any],
) -> Container:
Expand All @@ -41,14 +42,14 @@ A simple container, that uses plain tcp for message exchange can be created as f
from mango import create_tcp_container

def get_simple_container():
container = create_tcp_container(addr=('localhost', 5555))
container = create_tcp_container(addr=('127.0.0.1', 5555))
return container

print(get_simple_container().addr)

.. testoutput::

('localhost', 5555)
('127.0.0.1', 5555)

The container type depends totally on the factory method you invoke. Every supported type has its own class backing
the functionality.
Expand All @@ -66,7 +67,7 @@ asynchronous context manager, which we provide by invoking :meth:`mango.activate
from mango import create_tcp_container, activate

async def start_container():
container = create_tcp_container(addr=('localhost', 5555))
container = create_tcp_container(addr=('127.0.0.1', 5555))

async with activate(container) as c:
print("The container is activated now!")
Expand Down Expand Up @@ -105,7 +106,7 @@ Note that, custom agents that inherit from the ``Agent`` class have to call ``su
pass

async def create_and_register_agent():
container = create_tcp_container(addr=('localhost', 5555))
container = create_tcp_container(addr=('127.0.0.1', 5555))

agent = container.register(MyAgent(), suggested_aid="CustomAgent")
return agent
Expand Down
4 changes: 2 additions & 2 deletions docs/source/codecs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ All that is left to do now is to pass our codec to the container. This is done d
# codecs can be passed directly to the container
# if no codec is passed a new instance of JSON() is created
sending_container = create_tcp_container(addr=("localhost", 5556), codec=codec)
receiving_container = create_tcp_container(addr=("localhost", 5555), codec=codec)
sending_container = create_tcp_container(addr=("127.0.0.1", 5556), codec=codec)
receiving_container = create_tcp_container(addr=("127.0.0.1", 5555), codec=codec)
receiving_agent = receiving_container.register(SimpleReceivingAgent())

async with activate(sending_container, receiving_container):
Expand Down
14 changes: 7 additions & 7 deletions docs/source/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ found in :doc:`Agents and container <agents-container>`

# Containers have to be created using a factory method
# Other container types are available through create_mqtt_container and create_ec_container
container = create_tcp_container(addr=('localhost', 5555))
container = create_tcp_container(addr=('127.0.0.1', 5555))
print(container.addr)

.. testoutput::

('localhost', 5555)
('127.0.0.1', 5555)


This is how a tcp container is created. While container creation, it is possible to set the codec, the address information (depending on the type)
Expand Down Expand Up @@ -103,12 +103,12 @@ The following script will create a RepeatingAgent, register it, and let it run w
async with activate(first_container) as container:
await asyncio.sleep(duration)

asyncio.run(run_container_and_agent(addr=('localhost', 5555), duration=0.05))
asyncio.run(run_container_and_agent(addr=('127.0.0.1', 5555), duration=0.05))

.. testoutput::

Creating a RepeatingAgent. At this point self.addr=None
The agent has been registered to a container: AgentAddress(protocol_addr=('localhost', 5555), aid='agent0')!
The agent has been registered to a container: AgentAddress(protocol_addr=('127.0.0.1', 5555), aid='agent0')!
All containers have been activated!

In this example no messages are sent, nor does the Agent do anything, but the call order of the hook-in functions is clearly visible.
Expand Down Expand Up @@ -142,7 +142,7 @@ to another agent:
async with activate(first_container) as container:
await first_hello_agent.greet(second_hello_agent.addr)

asyncio.run(run_container_and_agent(addr=('localhost', 5555), duration=0.05))
asyncio.run(run_container_and_agent(addr=('127.0.0.1', 5555), duration=0.05))

.. testoutput::

Expand Down Expand Up @@ -198,13 +198,13 @@ a RepeatingAgent and let them run.
await asyncio.sleep(.1)

asyncio.run(run_container_and_two_agents(
first_addr=('localhost', 5555), second_addr=('localhost', 5556))
first_addr=('127.0.0.1', 5555), second_addr=('127.0.0.1', 5556))
)

.. testoutput::

Creating a RepeatingAgent. At this point self.addr=None
The agent has been registered to a container: AgentAddress(protocol_addr=('localhost', 5555), aid='agent0')!
The agent has been registered to a container: AgentAddress(protocol_addr=('127.0.0.1', 5555), aid='agent0')!
All containers have been activated!
Received a message with the following content: Hello world!!

Expand Down
4 changes: 2 additions & 2 deletions docs/source/role-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ determine the message dispatch order (lower number = earlier execution, default=

async def show_handle_sub():
my_composed_agent = agent_composed_of(MyRole())
async with run_with_tcp(1, my_composed_agent) as cl:
await cl[0].send_message(Ping(), my_composed_agent.addr)
async with run_with_tcp(1, my_composed_agent) as container:
await container.send_message(Ping(), my_composed_agent.addr)

asyncio.run(show_handle_sub())

Expand Down
4 changes: 2 additions & 2 deletions docs/source/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ In the following a simple example is shown.
from mango import DistributedClockAgent, DistributedClockManager, create_tcp_container, activate, ExternalClock

async def main():
container_man = create_tcp_container(("localhost", 1555), clock=ExternalClock())
container_ag = create_tcp_container(("localhost", 1556), clock=ExternalClock())
container_man = create_tcp_container(("127.0.0.1", 1555), clock=ExternalClock())
container_ag = create_tcp_container(("127.0.0.1", 1556), clock=ExternalClock())

clock_agent = container_ag.register(DistributedClockAgent())
clock_manager = container_man.register(DistributedClockManager(
Expand Down
20 changes: 10 additions & 10 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ For this tutorial we will cover the tcp container.

from mango import create_tcp_container

PV_CONTAINER_ADDRESS = ("localhost", 5555)
PV_CONTAINER_ADDRESS = ("127.0.0.1", 5555)

pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS)

print(pv_container.addr)

.. testoutput::

('localhost', 5555)
('127.0.0.1', 5555)

Now we can create our agents. Agents always live inside a container and therefore need to be registered to the container.

Expand All @@ -110,7 +110,7 @@ Now we can create our agents. Agents always live inside a container and therefor

Hello I am a PV agent!
Hello I am a PV agent!
AgentAddress(protocol_addr=('localhost', 5555), aid='agent1')
AgentAddress(protocol_addr=('127.0.0.1', 5555), aid='agent1')

For now, our agents and containers are purely passive entities. First, we need to activate the container to start
the tcp server and its internal asynchronous behavior. In mango this can be done with :meth:`mango.activate` and the `async with` syntax.
Expand Down Expand Up @@ -143,7 +143,7 @@ is wrapped in the :class:`mango.AgentAddress` class and can be retrieved with :m

Hello I am a PV agent!
Hello I am a PV agent!
Received message with content: Hello, this is a simple message. and meta {'sender_id': 'agent2', 'sender_addr': ('localhost', 5555), 'receiver_id': 'agent3', 'network_protocol': 'tcp', 'priority': 0}.
Received message with content: Hello, this is a simple message. and meta {'sender_id': 'agent2', 'sender_addr': ('127.0.0.1', 5555), 'receiver_id': 'agent3', 'network_protocol': 'tcp', 'priority': 0}.


*********************************
Expand Down Expand Up @@ -369,8 +369,8 @@ Lastly, we call all relevant instantiations and the run function within our main

from mango import create_tcp_container, activate, Performatives

PV_CONTAINER_ADDRESS = ("localhost", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556)
PV_CONTAINER_ADDRESS = ("127.0.0.1", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("127.0.0.1", 5556)
PV_FEED_IN = {
'PV Agent 0': 2.0,
'PV Agent 1': 1.0,
Expand Down Expand Up @@ -464,8 +464,8 @@ Next, we need to create a codec, make our message objects known to it, and pass

from mango import JSON

PV_CONTAINER_ADDRESS = ("localhost", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556)
PV_CONTAINER_ADDRESS = ("127.0.0.1", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("127.0.0.1", 5556)

my_codec = JSON()
my_codec.add_serializer(*AskFeedInMsg.__serializer__())
Expand Down Expand Up @@ -675,8 +675,8 @@ also run tasks at specific times. For a full overview we refer to the documentat

from mango import sender_addr, Role, RoleAgent, JSON, create_tcp_container, json_serializable, agent_composed_of

PV_CONTAINER_ADDRESS = ("localhost", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556)
PV_CONTAINER_ADDRESS = ("127.0.0.1", 5555)
CONTROLLER_CONTAINER_ADDRESS = ("127.0.0.1", 5556)
PV_FEED_IN = {
"PV Agent 0": 2.0,
"PV Agent 1": 1.0,
Expand Down
1 change: 0 additions & 1 deletion mango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .messages.codecs import (
json_serializable,
JSON,
FastJSON,
PROTOBUF,
SerializationError,
)
3 changes: 2 additions & 1 deletion mango/container/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def create_tcp(
codec: Codec = None,
clock: Clock = None,
copy_internal_messages: bool = False,
auto_port=False,
**kwargs: dict[str, Any],
) -> Container:
"""
Expand All @@ -81,7 +82,7 @@ def create_tcp(

# initialize TCPContainer
return TCPContainer(
addr=addr,
addr=(addr[0], 0) if auto_port else addr,
codec=codec,
clock=clock,
copy_internal_messages=copy_internal_messages,
Expand Down
42 changes: 27 additions & 15 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
from dataclasses import dataclass
from multiprocessing import Event, Process
from multiprocessing import get_context
from multiprocessing.synchronize import Event as MultiprocessingEvent

import dill # noqa F401 # do not remove! Necessary for the auto loaded pickle reg extensions
Expand Down Expand Up @@ -102,16 +102,21 @@ def create_agent_process_environment(
:type process_initialized_event: Event
"""
asyncio.set_event_loop(asyncio.new_event_loop())
container_data.codec = dill.loads(container_data.codec)
agent_creator = dill.loads(agent_creator)
mirror_container_creator = dill.loads(mirror_container_creator)

async def start_agent_loop():
container = mirror_container_creator(
container_data,
asyncio.get_event_loop(),
message_pipe,
message_pipe.dup(),
main_queue,
event_pipe,
event_pipe.dup(),
terminate_event,
)
message_pipe.close()
event_pipe.close()
if asyncio.iscoroutinefunction(agent_creator):
await agent_creator(container)
else:
Expand Down Expand Up @@ -354,11 +359,12 @@ def __init__(
self._active = False
self._container = container
self._mp_enabled = False
self._ctx = get_context("spawn")

def _init_mp(self):
# For agent multiprocessing support
self._agent_processes = []
self._terminate_sub_processes = Event()
self._terminate_sub_processes = self._ctx.Event()
self._pid_to_message_pipe = {}
self._pid_to_pipe = {}
self._pid_to_aids = {}
Expand Down Expand Up @@ -439,21 +445,21 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato
self._init_mp()
self._active = True

from_pipe_message, to_pipe_message = aioduplex()
from_pipe, to_pipe = aioduplex()
process_initialized = Event()
from_pipe_message, to_pipe_message = aioduplex(self._ctx)
from_pipe, to_pipe = aioduplex(self._ctx)
process_initialized = self._ctx.Event()
with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message:
agent_process = Process(
agent_process = self._ctx.Process(
target=create_agent_process_environment,
args=(
ContainerData(
addr=container.addr,
codec=container.codec,
codec=dill.dumps(container.codec),
clock=container.clock,
kwargs=container._kwargs,
),
agent_creator,
mirror_container_creator,
dill.dumps(agent_creator),
dill.dumps(mirror_container_creator),
to_pipe_message,
self._main_queue,
to_pipe,
Expand All @@ -465,19 +471,25 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato
agent_process.daemon = True
agent_process.start()

self._pid_to_message_pipe[agent_process.pid] = from_pipe_message
self._pid_to_pipe[agent_process.pid] = from_pipe
from_pipe_message_dup = from_pipe_message.dup()
from_pipe_dup = from_pipe.dup()
from_pipe_message.close()
from_pipe.close()
self._pid_to_message_pipe[agent_process.pid] = from_pipe_message_dup
self._pid_to_pipe[agent_process.pid] = from_pipe_dup
self._handle_process_events_tasks.append(
asyncio.create_task(self._handle_process_events(from_pipe))
asyncio.create_task(self._handle_process_events(from_pipe_dup))
)
self._handle_sp_messages_tasks.append(
asyncio.create_task(self._handle_process_message(from_pipe_message))
asyncio.create_task(self._handle_process_message(from_pipe_message_dup))
)

async def wait_for_process_initialized():
while not process_initialized.is_set():
await asyncio.sleep(WAIT_STEP)

await asyncio.sleep(0)

return AgentProcessHandle(
asyncio.create_task(wait_for_process_initialized()), agent_process.pid
)
Expand Down
2 changes: 2 additions & 0 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ async def start(self):
self.addr[0],
self.addr[1],
)
# if 0 is specified
self.addr = (self.addr[0], self.server.sockets[0]._sock.getsockname()[1])
await super().start()

async def send_message(
Expand Down
Loading

0 comments on commit ab26f41

Please sign in to comment.