Skip to content

Commit

Permalink
updates event store load methods by using prepared statements
Browse files Browse the repository at this point in the history
  • Loading branch information
oqq committed Jan 18, 2017
1 parent f375205 commit 82a55d8
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 76 deletions.
35 changes: 10 additions & 25 deletions src/PdoStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ final class PdoStreamIterator implements Iterator
*/
private $statement;

/**
* @var array
*/
private $sql;

/**
* @var MessageFactory
*/
Expand All @@ -57,6 +52,9 @@ final class PdoStreamIterator implements Iterator
*/
private $batchPosition = 0;

/**
* @var int|null
*/
private $batchSize;

/**
Expand All @@ -78,7 +76,6 @@ public function __construct(
PDO $connection,
PDOStatement $statement,
MessageFactory $messageFactory,
array $sql,
?int $batchSize,
int $fromNumber,
int $count,
Expand All @@ -87,7 +84,6 @@ public function __construct(
$this->connection = $connection;
$this->statement = $statement;
$this->messageFactory = $messageFactory;
$this->sql = $sql;
$this->batchSize = $batchSize;
$this->fromNumber = $fromNumber;
$this->count = $count;
Expand Down Expand Up @@ -145,9 +141,9 @@ public function next(): void
} else {
$limit = $this->fromNumber - $this->batchSize * $this->batchPosition;
}
$this->statement = $this->buildStatement($this->sql, $limit);

$this->statement = $this->buildStatement($limit);
$this->statement->execute();
$this->statement->setFetchMode(PDO::FETCH_OBJ);

$this->currentItem = $this->statement->fetch();

Expand Down Expand Up @@ -184,10 +180,9 @@ public function rewind(): void
//Only perform rewind if current item is not the first element
if ($this->currentKey !== 0) {
$this->batchPosition = 0;
$this->statement = $this->buildStatement($this->sql, $this->fromNumber);

$this->statement = $this->buildStatement($this->fromNumber);
$this->statement->execute();
$this->statement->setFetchMode(PDO::FETCH_OBJ);

$this->currentItem = null;
$this->currentKey = -1;
Expand All @@ -196,25 +191,15 @@ public function rewind(): void
}
}

private function buildStatement(array $sql, int $fromNumber): PDOStatement
private function buildStatement(int $fromNumber): PDOStatement
{
$limit = $this->count < ($this->batchSize * ($this->batchPosition + 1))
? $this->count - ($this->batchSize * $this->batchPosition)
: $this->batchSize;

if ($this->forward) {
$query = $sql['from'] . " WHERE no >= $fromNumber";
} else {
$query = $sql['from'] . " WHERE no <= $fromNumber";
}

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}
$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$this->statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$this->statement->bindValue(':limit', $limit, PDO::PARAM_INT);

return $this->connection->prepare($query);
return $this->statement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/PersistenceStrategy/PostgresSimpleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion src/PersistenceStrategy/PostgresSingleStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function createSchema(string $tableName): array
{
$statement = <<<EOT
CREATE TABLE $tableName (
no SERIAL,
no BIGSERIAL,
event_id CHAR(36) NOT NULL,
event_name VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
Expand Down
97 changes: 49 additions & 48 deletions src/PostgresEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -234,42 +234,44 @@ public function load(

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no ASC',
];
$where = [];
$values = [];

foreach ($metadataMatcher->data() as $match) {
foreach ($metadataMatcher->data() as $key => $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
$parameter = ':metadata_'.$key;

if (is_bool($value)) {
$value = var_export($value, true);
$sql['where'][] = "metadata ->> '$field' $operator '$value'";
} elseif (is_string($value)) {
$value = $this->connection->quote($value);
$sql['where'][] = "metadata ->> '$field' $operator $value";
} else {
$sql['where'][] = "metadata ->> '$field' $operator '$value'";
$where[] = "metadata->'$field' $operator '".var_export($value, true)."'";
continue;
}
}

$limit = $count < $this->loadBatchSize
? $count
: $this->loadBatchSize;
$where[] = "metadata->>'$field' $operator $parameter";
$values[$parameter] = $value;
}

$query = $sql['from'] . " WHERE no >= $fromNumber";
$where[] = "no >= :fromNumber";

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}
$whereCondition = implode(' AND ', $where);
$limit = min($count, $this->loadBatchSize);

$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$query = <<<EOT
SELECT * FROM $tableName
WHERE $whereCondition
ORDER BY no ASC
LIMIT :limit;
EOT;

$statement = $this->connection->prepare($query);
$statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$statement->bindValue(':limit', $limit, PDO::PARAM_INT);

foreach ($values as $parameter => $value) {
$statement->bindValue($parameter, $value);
}

$statement->setFetchMode(PDO::FETCH_OBJ);
$statement->execute();

Expand All @@ -283,7 +285,6 @@ public function load(
$this->connection,
$statement,
$this->messageFactory,
$sql,
$this->loadBatchSize,
$fromNumber,
$count,
Expand All @@ -308,42 +309,43 @@ public function loadReverse(

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$sql = [
'from' => "SELECT * FROM $tableName",
'orderBy' => 'ORDER BY no DESC',
];
$where = [];
$values = [];

foreach ($metadataMatcher->data() as $match) {
foreach ($metadataMatcher->data() as $key => $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
$parameter = ':metadata_'.$key;

if (is_bool($value)) {
$value = var_export($value, true);
$sql['where'][] = "metadata ->> '$field' $operator '$value'";
} elseif (is_string($value)) {
$value = $this->connection->quote($value);
$sql['where'][] = "metadata ->> '$field' $operator $value";
} else {
$sql['where'][] = "metadata ->> '$field' $operator '$value'";
$where[] = "metadata->'$field' $operator '".var_export($value, true)."'";
continue;
}
}

$limit = $count < $this->loadBatchSize
? $count
: $this->loadBatchSize;
$where[] = "metadata->>'$field' $operator $parameter";
$values[$parameter] = $value;
}

$query = $sql['from'] . " WHERE no <= $fromNumber";
$where[] = "no <= :fromNumber";

if (isset($sql['where'])) {
$query .= ' AND ';
$query .= implode(' AND ', $sql['where']);
}
$whereCondition = implode(' AND ', $where);
$limit = min($count, $this->loadBatchSize);

$query .= ' ' . $sql['orderBy'];
$query .= " LIMIT $limit;";
$query = <<<EOT
SELECT * FROM $tableName
WHERE $whereCondition
ORDER BY no DESC
LIMIT :limit;
EOT;

$statement = $this->connection->prepare($query);
$statement->bindValue(':fromNumber', $fromNumber, PDO::PARAM_INT);
$statement->bindValue(':limit', $limit, PDO::PARAM_INT);

foreach ($values as $parameter => $value) {
$statement->bindValue($parameter, $value);
}

$statement->setFetchMode(PDO::FETCH_OBJ);
$statement->execute();
Expand All @@ -358,7 +360,6 @@ public function loadReverse(
$this->connection,
$statement,
$this->messageFactory,
$sql,
$this->loadBatchSize,
$fromNumber,
$count,
Expand Down

0 comments on commit 82a55d8

Please sign in to comment.