diff --git a/scripts/mysql/01_event_streams_table.sql b/scripts/mysql/01_event_streams_table.sql index 31a20e9f..3bdd8028 100644 --- a/scripts/mysql/01_event_streams_table.sql +++ b/scripts/mysql/01_event_streams_table.sql @@ -1,5 +1,5 @@ CREATE TABLE `event_streams` ( - `no` INT(11) NOT NULL AUTO_INCREMENT, + `no` BIGINT(20) NOT NULL AUTO_INCREMENT, `real_stream_name` VARCHAR(150) NOT NULL, `stream_name` CHAR(41) NOT NULL, `metadata` JSON, diff --git a/scripts/mysql/02_projections_table.sql b/scripts/mysql/02_projections_table.sql index 2541aec5..e99a4539 100644 --- a/scripts/mysql/02_projections_table.sql +++ b/scripts/mysql/02_projections_table.sql @@ -1,5 +1,5 @@ CREATE TABLE `projections` ( - `no` INT(11) NOT NULL AUTO_INCREMENT, + `no` BIGINT(20) NOT NULL AUTO_INCREMENT, `name` VARCHAR(150) NOT NULL, `position` JSON, `state` JSON, diff --git a/scripts/postgres/01_event_streams_table.sql b/scripts/postgres/01_event_streams_table.sql index 672d1f3c..953cc18d 100644 --- a/scripts/postgres/01_event_streams_table.sql +++ b/scripts/postgres/01_event_streams_table.sql @@ -1,5 +1,5 @@ CREATE TABLE event_streams ( - no SERIAL, + no BIGSERIAL, real_stream_name VARCHAR(150) NOT NULL, stream_name CHAR(41) NOT NULL, metadata JSONB, diff --git a/scripts/postgres/02_projections_table.sql b/scripts/postgres/02_projections_table.sql index 6a9091a7..a0ade347 100644 --- a/scripts/postgres/02_projections_table.sql +++ b/scripts/postgres/02_projections_table.sql @@ -1,5 +1,5 @@ CREATE TABLE projections ( - no SERIAL, + no BIGSERIAL, name VARCHAR(150) NOT NULL, position JSONB, state JSONB, diff --git a/src/MySqlEventStore.php b/src/MySqlEventStore.php index 43d625fd..533717cc 100644 --- a/src/MySqlEventStore.php +++ b/src/MySqlEventStore.php @@ -265,40 +265,44 @@ public function load( $tableName = $this->persistenceStrategy->generateTableName($streamName); - $sql = [ - 'from' => "SELECT * FROM $tableName", - 'orderBy' => 'ORDER BY no ASC', - ]; + $where = []; + $values = []; - foreach ($metadataMatcher->data() as $match) { + foreach ($metadataMatcher->data() as $key => $match) { $field = $match['field']; $operator = $match['operator']->getValue(); $value = $match['value']; + $parameter = ':metadata_'.$key; if (is_bool($value)) { - $value = var_export($value, true); - } elseif (is_string($value)) { - $value = $this->connection->quote($value); + $where[] = "metadata->\"$.$field\" $operator ".var_export($value, true); + continue; } - $sql['where'][] = "metadata->\"$.$field\" $operator $value"; + $where[] = "metadata->\"$.$field\" $operator $parameter"; + $values[$parameter] = $value; } - $limit = $count < $this->loadBatchSize - ? $count - : $this->loadBatchSize; + $where[] = '`no` >= :fromNumber'; - $query = $sql['from'] . " WHERE no >= $fromNumber"; + $whereCondition = implode(' AND ', $where); + $limit = min($count, $this->loadBatchSize); - if (isset($sql['where'])) { - $query .= ' AND '; - $query .= implode(' AND ', $sql['where']); - } - - $query .= ' ' . $sql['orderBy']; - $query .= " LIMIT $limit;"; + $query = <<connection->prepare($query); + $statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT); + $statement->bindValue(':limit', $limit, PDO::PARAM_INT); + + foreach ($values as $parameter => $value) { + $statement->bindValue($parameter, $value, is_int($value) ? PDO::PARAM_INT : PDO::PARAM_STR); + } + $statement->setFetchMode(PDO::FETCH_OBJ); $statement->execute(); @@ -312,7 +316,6 @@ public function load( $this->connection, $statement, $this->messageFactory, - $sql, $this->loadBatchSize, $fromNumber, $count, @@ -337,40 +340,43 @@ public function loadReverse( $tableName = $this->persistenceStrategy->generateTableName($streamName); - $sql = [ - 'from' => "SELECT * FROM $tableName", - 'orderBy' => 'ORDER BY no DESC', - ]; + $where = []; + $values = []; - foreach ($metadataMatcher->data() as $match) { + foreach ($metadataMatcher->data() as $key => $match) { $field = $match['field']; $operator = $match['operator']->getValue(); $value = $match['value']; + $parameter = ':metadata_'.$key; if (is_bool($value)) { - $value = var_export($value, true); - } elseif (is_string($value)) { - $value = $this->connection->quote($value); + $where[] = "metadata->\"$.$field\" $operator ".var_export($value, true); + continue; } - $sql['where'][] = "metadata->\"$.$field\" $operator $value"; + $where[] = "metadata->\"$.$field\" $operator $parameter"; + $values[$parameter] = $value; } - $limit = $count < $this->loadBatchSize - ? $count - : $this->loadBatchSize; + $where[] = '`no` <= :fromNumber'; - $query = $sql['from'] . " WHERE no <= $fromNumber"; + $whereCondition = implode(' AND ', $where); + $limit = min($count, $this->loadBatchSize); - if (isset($sql['where'])) { - $query .= ' AND '; - $query .= implode(' AND ', $sql['where']); - } - - $query .= ' ' . $sql['orderBy']; - $query .= " LIMIT $limit;"; + $query = <<connection->prepare($query); + $statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT); + $statement->bindValue(':limit', $limit, PDO::PARAM_INT); + + foreach ($values as $parameter => $value) { + $statement->bindValue($parameter, $value, is_int($value) ? PDO::PARAM_INT : PDO::PARAM_STR); + } $statement->setFetchMode(PDO::FETCH_OBJ); $statement->execute(); @@ -385,7 +391,6 @@ public function loadReverse( $this->connection, $statement, $this->messageFactory, - $sql, $this->loadBatchSize, $fromNumber, $count, diff --git a/src/PdoStreamIterator.php b/src/PdoStreamIterator.php index 3caa8831..5d13d659 100644 --- a/src/PdoStreamIterator.php +++ b/src/PdoStreamIterator.php @@ -32,11 +32,6 @@ final class PdoStreamIterator implements Iterator */ private $statement; - /** - * @var array - */ - private $sql; - /** * @var MessageFactory */ @@ -57,6 +52,9 @@ final class PdoStreamIterator implements Iterator */ private $batchPosition = 0; + /** + * @var int|null + */ private $batchSize; /** @@ -78,7 +76,6 @@ public function __construct( PDO $connection, PDOStatement $statement, MessageFactory $messageFactory, - array $sql, ?int $batchSize, int $fromNumber, int $count, @@ -87,7 +84,6 @@ public function __construct( $this->connection = $connection; $this->statement = $statement; $this->messageFactory = $messageFactory; - $this->sql = $sql; $this->batchSize = $batchSize; $this->fromNumber = $fromNumber; $this->count = $count; @@ -145,9 +141,9 @@ public function next(): void } else { $limit = $this->fromNumber - $this->batchSize * $this->batchPosition; } - $this->statement = $this->buildStatement($this->sql, $limit); + + $this->statement = $this->buildStatement($limit); $this->statement->execute(); - $this->statement->setFetchMode(PDO::FETCH_OBJ); $this->currentItem = $this->statement->fetch(); @@ -184,10 +180,9 @@ public function rewind(): void //Only perform rewind if current item is not the first element if ($this->currentKey !== 0) { $this->batchPosition = 0; - $this->statement = $this->buildStatement($this->sql, $this->fromNumber); + $this->statement = $this->buildStatement($this->fromNumber); $this->statement->execute(); - $this->statement->setFetchMode(PDO::FETCH_OBJ); $this->currentItem = null; $this->currentKey = -1; @@ -196,25 +191,15 @@ public function rewind(): void } } - private function buildStatement(array $sql, int $fromNumber): PDOStatement + private function buildStatement(int $fromNumber): PDOStatement { $limit = $this->count < ($this->batchSize * ($this->batchPosition + 1)) ? $this->count - ($this->batchSize * $this->batchPosition) : $this->batchSize; - if ($this->forward) { - $query = $sql['from'] . " WHERE no >= $fromNumber"; - } else { - $query = $sql['from'] . " WHERE no <= $fromNumber"; - } - - if (isset($sql['where'])) { - $query .= ' AND '; - $query .= implode(' AND ', $sql['where']); - } - $query .= ' ' . $sql['orderBy']; - $query .= " LIMIT $limit;"; + $this->statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT); + $this->statement->bindValue(':limit', $limit, PDO::PARAM_INT); - return $this->connection->prepare($query); + return $this->statement; } } diff --git a/src/PersistenceStrategy/MySqlAggregateStreamStrategy.php b/src/PersistenceStrategy/MySqlAggregateStreamStrategy.php index cfb108fd..c1a0d8b0 100644 --- a/src/PersistenceStrategy/MySqlAggregateStreamStrategy.php +++ b/src/PersistenceStrategy/MySqlAggregateStreamStrategy.php @@ -27,13 +27,13 @@ public function createSchema(string $tableName): array { $statement = <<persistenceStrategy->generateTableName($streamName); - $sql = [ - 'from' => "SELECT * FROM $tableName", - 'orderBy' => 'ORDER BY no ASC', - ]; + $where = []; + $values = []; - foreach ($metadataMatcher->data() as $match) { + foreach ($metadataMatcher->data() as $key => $match) { $field = $match['field']; $operator = $match['operator']->getValue(); $value = $match['value']; + $parameter = ':metadata_'.$key; if (is_bool($value)) { - $value = var_export($value, true); - $sql['where'][] = "metadata ->> '$field' $operator '$value'"; - } elseif (is_string($value)) { - $value = $this->connection->quote($value); - $sql['where'][] = "metadata ->> '$field' $operator $value"; - } else { - $sql['where'][] = "metadata ->> '$field' $operator '$value'"; + $where[] = "metadata->'$field' $operator '".var_export($value, true)."'"; + continue; } - } - $limit = $count < $this->loadBatchSize - ? $count - : $this->loadBatchSize; + $where[] = "metadata->>'$field' $operator $parameter"; + $values[$parameter] = $value; + } - $query = $sql['from'] . " WHERE no >= $fromNumber"; + $where[] = 'no >= :fromNumber'; - if (isset($sql['where'])) { - $query .= ' AND '; - $query .= implode(' AND ', $sql['where']); - } + $whereCondition = implode(' AND ', $where); + $limit = min($count, $this->loadBatchSize); - $query .= ' ' . $sql['orderBy']; - $query .= " LIMIT $limit;"; + $query = <<connection->prepare($query); + $statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT); + $statement->bindValue(':limit', $limit, PDO::PARAM_INT); + + foreach ($values as $parameter => $value) { + $statement->bindValue($parameter, $value); + } + $statement->setFetchMode(PDO::FETCH_OBJ); $statement->execute(); @@ -283,7 +285,6 @@ public function load( $this->connection, $statement, $this->messageFactory, - $sql, $this->loadBatchSize, $fromNumber, $count, @@ -308,42 +309,43 @@ public function loadReverse( $tableName = $this->persistenceStrategy->generateTableName($streamName); - $sql = [ - 'from' => "SELECT * FROM $tableName", - 'orderBy' => 'ORDER BY no DESC', - ]; + $where = []; + $values = []; - foreach ($metadataMatcher->data() as $match) { + foreach ($metadataMatcher->data() as $key => $match) { $field = $match['field']; $operator = $match['operator']->getValue(); $value = $match['value']; + $parameter = ':metadata_'.$key; if (is_bool($value)) { - $value = var_export($value, true); - $sql['where'][] = "metadata ->> '$field' $operator '$value'"; - } elseif (is_string($value)) { - $value = $this->connection->quote($value); - $sql['where'][] = "metadata ->> '$field' $operator $value"; - } else { - $sql['where'][] = "metadata ->> '$field' $operator '$value'"; + $where[] = "metadata->'$field' $operator '".var_export($value, true)."'"; + continue; } - } - $limit = $count < $this->loadBatchSize - ? $count - : $this->loadBatchSize; + $where[] = "metadata->>'$field' $operator $parameter"; + $values[$parameter] = $value; + } - $query = $sql['from'] . " WHERE no <= $fromNumber"; + $where[] = 'no <= :fromNumber'; - if (isset($sql['where'])) { - $query .= ' AND '; - $query .= implode(' AND ', $sql['where']); - } + $whereCondition = implode(' AND ', $where); + $limit = min($count, $this->loadBatchSize); - $query .= ' ' . $sql['orderBy']; - $query .= " LIMIT $limit;"; + $query = <<connection->prepare($query); + $statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT); + $statement->bindValue(':limit', $limit, PDO::PARAM_INT); + + foreach ($values as $parameter => $value) { + $statement->bindValue($parameter, $value); + } $statement->setFetchMode(PDO::FETCH_OBJ); $statement->execute(); @@ -358,7 +360,6 @@ public function loadReverse( $this->connection, $statement, $this->messageFactory, - $sql, $this->loadBatchSize, $fromNumber, $count,