-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_engine.py
59 lines (43 loc) · 1.77 KB
/
test_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Copyright (c) 2013-2023 The Kotori developers and contributors.
# This module is part of LorryStream and is released under the MIT License.
# See LICENSE file for more information.
import asyncio
import json
import logging
import pytest
import sqlalchemy as sa
from lorrystream.core import ChannelFactory, Engine
capmqtt_decode_utf8 = True
logger = logging.getLogger(__name__)
class engine_single_shot:
def __init__(self, *channels):
self.engine = Engine()
for channel in channels:
self.engine.register(channel)
async def __aenter__(self, *channels):
logger.info("Starting engine")
self.engine.start()
await asyncio.sleep(0.25)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await asyncio.sleep(1.50)
logger.info("Stopping engine")
self.engine.stop()
await asyncio.sleep(0.25)
return False
@pytest.mark.asyncio_cooperative
async def test_dataframe_to_sql(mosquitto, cratedb, capmqtt):
cratedb.reset()
database_url = cratedb.get_connection_url()
channel = ChannelFactory("mqtt://localhost/testdrive/%23", f"{database_url}/?table=testdrive").channel()
# Run machinery and publish reading.
async with engine_single_shot(channel):
reading = {"device": "foo", "temperature": 42.42, "humidity": 84.84}
capmqtt.publish("testdrive/readings", json.dumps(reading))
capmqtt.publish("testdrive/readings", json.dumps(reading))
# Validate data in storage system.
sa_engine = sa.create_engine(database_url)
with sa_engine.connect() as conn:
conn.exec_driver_sql("REFRESH TABLE testdrive;")
with conn.execute(sa.text("SELECT COUNT(*) FROM testdrive;")) as result:
assert result.scalar_one() == 2