Skip to content

Commit

Permalink
add some MySQL plugin logging and clarify the plugin's` section of th…
Browse files Browse the repository at this point in the history
…e readme
  • Loading branch information
rgitzel committed Sep 17, 2018
1 parent 3eaae2f commit e67855e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 47 deletions.
101 changes: 58 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,56 @@ string isn't available in the _data_, the message is _not_ published.
### `mysql`
The MySQL plugin is one of the most complicated to set up. It requires the following configuration:
The MySQL plugin will attempt to add a row for every message received on a given topic, automatically filling in
columns.
For instance, given a table created with `CREATE TABLE names (id INTEGER, name VARCHAR(25));` then
the message '{ "name" : "Jane Jolie", "id" : 90, "number" : 17 }' on topic 'my/2' will be added to the table like this:
```mysql
+------+------------+
| id | name |
+------+------------+
| 90 | Jane Jolie |
+------+------------+
```
The values for the 'id' and 'name' columns are assumed to be filled by the values of the JSON nodes with the same name.
If you added columns 'topic', 'payload' and '_dtiso' to the databse, then that same message will add this row:
```mysql
+------+------------+-----------------------------------------------------+-----------------------------+-------+
| id | name | payload | _dtiso | topic |
+------+------------+-----------------------------------------------------+-----------------------------+-------+
| 90 | Jane Jolie | { "name" : "Jane Jolie", "id" : 90, "number" : 17 } | 2018-09-17T20:20:31.889002Z | my/2 |
+------+------------+-----------------------------------------------------+-----------------------------+-------+
```
Here, the plugin pulled values for the new columns from standard mqttwarn meta-data.
When a message is received, the plugin will attempt to populate the following column names:
- root-level JSON nodes in the message
- e.g. 'name' and 'id' above
- ['transformation data' fields](https://github.com/jpmens/mqttwarn#outbound-messages) names
- e.g. 'topic', 'payload' and '_dtiso' as above
- note that these all must be VARCHAR columns; timestamp columns are [not yet supported](https://github.com/jpmens/mqttwarn/issues/334#issuecomment-422141808)
- the 'fallback' column, as noted below
- #TODO: how is this different from 'payload'?
To be clear, there is no other way to configure the plugin to use different column names. If you
need such a capability (e.g. you want to a column called "receivedAt" to be filled with the timestamp)
then you can use an `alldata` function to transform the incoming message into a JSON document with the
desired node names.
#### Setup
The MySQL plugin is one of the most complicated to set up.
First it requires the [MySQLDb](http://mysql-python.sourceforge.net/) library to be installed.
- _Debian/Ubuntu_: `sudo apt-get install -y python-mysqldb`
- TODO: do we have any others?
It then requires the following configuration section:
```ini
[config:mysql]
Expand All @@ -1406,56 +1455,23 @@ user = 'jane'
pass = 'secret'
dbname = 'test'
targets = {
# tablename #fallbackcolumn ('NOP' to disable)
'm2' : [ 'names', 'full' ]
# tablename #fallbackcolumn ('NOP' to disable)
'm2' : [ 'names', 'full' ]
}
```
Suppose we create the following table for the target specified above:
```
CREATE TABLE names (id INTEGER, name VARCHAR(25));
```
and publish this JSON payload:
```
mosquitto_pub -t my/2 -m '{ "name" : "Jane Jolie", "id" : 90, "number" : 17 }'
```
This will result in the two columns `id` and `name` being populated:
```mysql
+------+------------+
| id | name |
+------+------------+
| 90 | Jane Jolie |
+------+------------+
Finally a topic section:
```ini
[names]
topic = my/#
targets = mysql:m2
```
The target contains a so-called _fallback column_ into which _mqttwarn_ adds
the "rest of" the payload for all columns not targeted with JSON data unless that
is explicitly configured as `NOP` in the service in which case extra data is discarded.
I'll now add our fallback column to the schema:
```mysql
ALTER TABLE names ADD full TEXT;
```
Publishing the same payload again, will insert this row into the table:
```mysql
+------+------------+-----------------------------------------------------+
| id | name | full |
+------+------------+-----------------------------------------------------+
| 90 | Jane Jolie | NULL |
| 90 | Jane Jolie | { "name" : "Jane Jolie", "id" : 90, "number" : 17 } |
+------+------------+-----------------------------------------------------+
```
As you can imagine, if we add a `number` column to the table, it too will be
correctly populated with the value `17`.
The payload of messages which do not contain valid JSON will be coped verbatim
to the _fallback_ column:
Expand All @@ -1467,8 +1483,7 @@ to the _fallback_ column:
+------+------+-------------+--------+
```
You can add columns with the names of the built-in transformation types (e.g. `_dthhmmss`, see below)
to have those values stored automatically.
### `mysql_dynamic`
Expand Down
9 changes: 5 additions & 4 deletions services/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys

# https://mail.python.org/pipermail/tutor/2010-December/080701.html
def add_row(cursor, tablename, rowdict):
def add_row(srv, cursor, tablename, rowdict):
# XXX tablename not sanitized
# XXX test for allowed keys is case-sensitive

Expand All @@ -26,9 +26,9 @@ def add_row(cursor, tablename, rowdict):
columns = ", ".join(keys)
values_template = ", ".join(["%s"] * len(keys))

sql = "insert into %s (%s) values (%s)" % (
tablename, columns, values_template)
sql = "insert into %s (%s) values (%s)" % (tablename, columns, values_template)
values = tuple(rowdict[key] for key in keys)
srv.logging.debug("adding row with sql '%s' and values: %s", sql, str(values))
cursor.execute(sql, values)

return unknown_keys
Expand All @@ -42,6 +42,7 @@ def plugin(srv, item):
user = item.config.get('user')
passwd = item.config.get('pass')
dbname = item.config.get('dbname')
srv.logging.debug("Connecting to MySql host '%s' and database '%s' as user '%s'", host, dbname, user)

try:
table_name = item.addrs[0].format(**item.data).encode('utf-8')
Expand Down Expand Up @@ -79,7 +80,7 @@ def plugin(srv, item):
col_data[key] = item.data[key]

try:
unknown_keys = add_row(cursor, table_name, col_data)
unknown_keys = add_row(srv, cursor, table_name, col_data)
if unknown_keys is not None:
srv.logging.debug("Skipping unused keys %s" % ",".join(unknown_keys))
conn.commit()
Expand Down

0 comments on commit e67855e

Please sign in to comment.