Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use prepared load statements #44

Merged
merged 3 commits into from
Jan 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/mysql/01_event_streams_table.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE `event_streams` (
`no` INT(11) NOT NULL AUTO_INCREMENT,
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`real_stream_name` VARCHAR(150) NOT NULL,
`stream_name` CHAR(41) NOT NULL,
`metadata` JSON,
Expand Down
2 changes: 1 addition & 1 deletion scripts/mysql/02_projections_table.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE `projections` (
`no` INT(11) NOT NULL AUTO_INCREMENT,
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(150) NOT NULL,
`position` JSON,
`state` JSON,
Expand Down
2 changes: 1 addition & 1 deletion scripts/postgres/01_event_streams_table.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE event_streams (
no SERIAL,
no BIGSERIAL,
real_stream_name VARCHAR(150) NOT NULL,
stream_name CHAR(41) NOT NULL,
metadata JSONB,
Expand Down
2 changes: 1 addition & 1 deletion scripts/postgres/02_projections_table.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE projections (
no SERIAL,
no BIGSERIAL,
name VARCHAR(150) NOT NULL,
position JSONB,
state JSONB,
Expand Down
89 changes: 47 additions & 42 deletions src/MySqlEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -265,40 +265,44 @@ public function load(

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no ASC',
];
$where = [];
$values = [];

foreach ($metadataMatcher->data() as $match) {
foreach ($metadataMatcher->data() as $key => $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
$parameter = ':metadata_'.$key;

if (is_bool($value)) {
$value = var_export($value, true);
} elseif (is_string($value)) {
$value = $this->connection->quote($value);
$where[] = "metadata->\"$.$field\" $operator ".var_export($value, true);
continue;
}

$sql['where'][] = "metadata->\"$.$field\" $operator $value";
$where[] = "metadata->\"$.$field\" $operator $parameter";
$values[$parameter] = $value;
}

$limit = $count < $this->loadBatchSize
? $count
: $this->loadBatchSize;
$where[] = '`no` >= :fromNumber';

$query = $sql['from'] . " WHERE no >= $fromNumber";
$whereCondition = implode(' AND ', $where);
$limit = min($count, $this->loadBatchSize);

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}

$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$query = <<<EOT
SELECT * FROM $tableName
WHERE $whereCondition
ORDER BY `no` ASC
LIMIT :limit;
EOT;

$statement = $this->connection->prepare($query);
$statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$statement->bindValue(':limit', $limit, PDO::PARAM_INT);

foreach ($values as $parameter => $value) {
$statement->bindValue($parameter, $value, is_int($value) ? PDO::PARAM_INT : PDO::PARAM_STR);
}

$statement->setFetchMode(PDO::FETCH_OBJ);
$statement->execute();

Expand All @@ -312,7 +316,6 @@ public function load(
$this->connection,
$statement,
$this->messageFactory,
$sql,
$this->loadBatchSize,
$fromNumber,
$count,
Expand All @@ -337,40 +340,43 @@ public function loadReverse(

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no DESC',
];
$where = [];
$values = [];

foreach ($metadataMatcher->data() as $match) {
foreach ($metadataMatcher->data() as $key => $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
$parameter = ':metadata_'.$key;

if (is_bool($value)) {
$value = var_export($value, true);
} elseif (is_string($value)) {
$value = $this->connection->quote($value);
$where[] = "metadata->\"$.$field\" $operator ".var_export($value, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be metadata->> to unquote.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Than it would also match 'true' === true. Right?

continue;
}

$sql['where'][] = "metadata->\"$.$field\" $operator $value";
$where[] = "metadata->\"$.$field\" $operator $parameter";
$values[$parameter] = $value;
}

$limit = $count < $this->loadBatchSize
? $count
: $this->loadBatchSize;
$where[] = '`no` <= :fromNumber';

$query = $sql['from'] . " WHERE no <= $fromNumber";
$whereCondition = implode(' AND ', $where);
$limit = min($count, $this->loadBatchSize);

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}

$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$query = <<<EOT
SELECT * FROM $tableName
WHERE $whereCondition
ORDER BY `no` DESC
LIMIT :limit;
EOT;

$statement = $this->connection->prepare($query);
$statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$statement->bindValue(':limit', $limit, PDO::PARAM_INT);

foreach ($values as $parameter => $value) {
$statement->bindValue($parameter, $value, is_int($value) ? PDO::PARAM_INT : PDO::PARAM_STR);
}

$statement->setFetchMode(PDO::FETCH_OBJ);
$statement->execute();
Expand All @@ -385,7 +391,6 @@ public function loadReverse(
$this->connection,
$statement,
$this->messageFactory,
$sql,
$this->loadBatchSize,
$fromNumber,
$count,
Expand Down
35 changes: 10 additions & 25 deletions src/PdoStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ final class PdoStreamIterator implements Iterator
*/
private $statement;

/**
* @var array
*/
private $sql;

/**
* @var MessageFactory
*/
Expand All @@ -57,6 +52,9 @@ final class PdoStreamIterator implements Iterator
*/
private $batchPosition = 0;

/**
* @var int|null
*/
private $batchSize;

/**
Expand All @@ -78,7 +76,6 @@ public function __construct(
PDO $connection,
PDOStatement $statement,
MessageFactory $messageFactory,
array $sql,
?int $batchSize,
int $fromNumber,
int $count,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is bool forward still needed then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok

Expand All @@ -87,7 +84,6 @@ public function __construct(
$this->connection = $connection;
$this->statement = $statement;
$this->messageFactory = $messageFactory;
$this->sql = $sql;
$this->batchSize = $batchSize;
$this->fromNumber = $fromNumber;
$this->count = $count;
Expand Down Expand Up @@ -145,9 +141,9 @@ public function next(): void
} else {
$limit = $this->fromNumber - $this->batchSize * $this->batchPosition;
}
$this->statement = $this->buildStatement($this->sql, $limit);

$this->statement = $this->buildStatement($limit);
$this->statement->execute();
$this->statement->setFetchMode(PDO::FETCH_OBJ);

$this->currentItem = $this->statement->fetch();

Expand Down Expand Up @@ -184,10 +180,9 @@ public function rewind(): void
//Only perform rewind if current item is not the first element
if ($this->currentKey !== 0) {
$this->batchPosition = 0;
$this->statement = $this->buildStatement($this->sql, $this->fromNumber);

$this->statement = $this->buildStatement($this->fromNumber);
$this->statement->execute();
$this->statement->setFetchMode(PDO::FETCH_OBJ);

$this->currentItem = null;
$this->currentKey = -1;
Expand All @@ -196,25 +191,15 @@ public function rewind(): void
}
}

private function buildStatement(array $sql, int $fromNumber): PDOStatement
private function buildStatement(int $fromNumber): PDOStatement
{
$limit = $this->count < ($this->batchSize * ($this->batchPosition + 1))
? $this->count - ($this->batchSize * $this->batchPosition)
: $this->batchSize;

if ($this->forward) {
$query = $sql['from'] . " WHERE no >= $fromNumber";
} else {
$query = $sql['from'] . " WHERE no <= $fromNumber";
}

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}
$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$this->statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$this->statement->bindValue(':limit', $limit, PDO::PARAM_INT);

return $this->connection->prepare($query);
return $this->statement;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see you removed forward

4 changes: 2 additions & 2 deletions src/PersistenceStrategy/MySqlAggregateStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE `$tableName` (
`no` INT(11) NOT NULL AUTO_INCREMENT,
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`event_id` CHAR(36) COLLATE utf8_bin NOT NULL,
`event_name` VARCHAR(100) COLLATE utf8_bin NOT NULL,
`payload` JSON NOT NULL,
`metadata` JSON NOT NULL,
`created_at` DATETIME(6) NOT NULL,
`version` INT(11) GENERATED ALWAYS AS (JSON_EXTRACT(metadata, '$._aggregate_version')) STORED NOT NULL UNIQUE KEY,
`version` INT(11) UNSIGNED GENERATED ALWAYS AS (JSON_EXTRACT(metadata, '$._aggregate_version')) STORED NOT NULL UNIQUE KEY,
PRIMARY KEY (`no`),
UNIQUE KEY `ix_event_id` (`event_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Expand Down
2 changes: 1 addition & 1 deletion src/PersistenceStrategy/MySqlSimpleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE `$tableName` (
`no` INT(11) NOT NULL AUTO_INCREMENT,
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`event_id` CHAR(36) COLLATE utf8_bin NOT NULL,
`event_name` VARCHAR(100) COLLATE utf8_bin NOT NULL,
`payload` JSON NOT NULL,
Expand Down
8 changes: 4 additions & 4 deletions src/PersistenceStrategy/MySqlSingleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE `$tableName` (
`no` INT(11) NOT NULL AUTO_INCREMENT,
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`event_id` CHAR(36) COLLATE utf8_bin NOT NULL,
`event_name` VARCHAR(100) COLLATE utf8_bin NOT NULL,
`payload` JSON NOT NULL,
`metadata` JSON NOT NULL,
`created_at` DATETIME(6) NOT NULL,
`version` INT(11) GENERATED ALWAYS AS (JSON_EXTRACT(metadata, '$._aggregate_version')) STORED NOT NULL,
`aggregate_id` char(36) CHARACTER SET utf8 COLLATE utf8_bin GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(metadata, '$._aggregate_id'))) STORED NOT NULL,
`aggregate_type` varchar(150) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(metadata, '$._aggregate_type'))) STORED NOT NULL,
`version` INT(11) UNSIGNED GENERATED ALWAYS AS (JSON_EXTRACT(metadata, '$._aggregate_version')) STORED NOT NULL,
`aggregate_id` CHAR(36) CHARACTER SET utf8 COLLATE utf8_bin GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(metadata, '$._aggregate_id'))) STORED NOT NULL,
`aggregate_type` VARCHAR(150) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(metadata, '$._aggregate_type'))) STORED NOT NULL,
PRIMARY KEY (`no`),
UNIQUE KEY `ix_event_id` (`event_id`),
UNIQUE KEY `ix_unique_event` (`version`, `aggregate_id`, `aggregate_type`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/PersistenceStrategy/PostgresSimpleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/PersistenceStrategy/PostgresSingleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
Loading