Skip to content

Commit

Permalink
fix: make the table/function catalog insert operation atomic (#1293)
Browse files Browse the repository at this point in the history
Fixes: #1282
  • Loading branch information
gaurav274 authored Oct 17, 2023
1 parent e21092c commit 7d51925
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 33 deletions.
7 changes: 7 additions & 0 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from evadb.binder.statement_binder_context import StatementBinderContext
from evadb.catalog.catalog_type import ColumnType, TableType
from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table
from evadb.catalog.sql_config import RESTRICTED_COL_NAMES
from evadb.configuration.constants import EvaDB_INSTALLATION_DIR
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.function_expression import FunctionExpression
Expand Down Expand Up @@ -201,6 +202,12 @@ def _bind_delete_statement(self, node: DeleteTableStatement):

@bind.register(CreateTableStatement)
def _bind_create_statement(self, node: CreateTableStatement):
# we don't allow certain keywords in the column_names
for col in node.column_list:
assert (
col.name.lower() not in RESTRICTED_COL_NAMES
), f"EvaDB does not allow to create a table with column name {col.name}"

if node.query is not None:
self.bind(node.query)

Expand Down
13 changes: 6 additions & 7 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,13 @@ def insert_function_catalog_entry(

checksum = get_file_checksum(impl_file_path)
function_entry = self._function_service.insert_entry(
name, impl_file_path, type, checksum
name,
impl_file_path,
type,
checksum,
function_io_list,
function_metadata_list,
)
for function_io in function_io_list:
function_io.function_id = function_entry.row_id
self._function_io_service.insert_entries(function_io_list)
for function_metadata in function_metadata_list:
function_metadata.function_id = function_entry.row_id
self._function_metadata_service.insert_entries(function_metadata_list)
return function_entry

def get_function_catalog_entry_by_name(self, name: str) -> FunctionCatalogEntry:
Expand Down
7 changes: 2 additions & 5 deletions evadb/catalog/services/column_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_entry_by_id(
return entry if return_alchemy else entry.as_dataclass()
return entry

def insert_entries(self, column_list: List[ColumnCatalogEntry]):
def create_entries(self, column_list: List[ColumnCatalogEntry]):
catalog_column_objs = [
self.model(
name=col.name,
Expand All @@ -75,10 +75,7 @@ def insert_entries(self, column_list: List[ColumnCatalogEntry]):
)
for col in column_list
]
saved_column_objs = []
for column in catalog_column_objs:
saved_column_objs.append(column.save(self.session))
return [obj.as_dataclass() for obj in saved_column_objs]
return catalog_column_objs

def filter_entries_by_table(
self, table: TableCatalogEntry
Expand Down
47 changes: 45 additions & 2 deletions evadb/catalog/services/function_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

from evadb.catalog.models.function_catalog import FunctionCatalog, FunctionCatalogEntry
from evadb.catalog.models.utils import (
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
)
from evadb.catalog.services.base_service import BaseService
from evadb.catalog.services.function_io_catalog_service import FunctionIOCatalogService
from evadb.catalog.services.function_metadata_catalog_service import (
FunctionMetadataCatalogService,
)
from evadb.utils.errors import CatalogError
from evadb.utils.logging_manager import logger


class FunctionCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(FunctionCatalog, db_session)
self._function_io_service = FunctionIOCatalogService(db_session)
self._function_metadata_service = FunctionMetadataCatalogService(db_session)

def insert_entry(
self, name: str, impl_path: str, type: str, checksum: str
self,
name: str,
impl_path: str,
type: str,
checksum: str,
function_io_list: List[FunctionIOCatalogEntry],
function_metadata_list: List[FunctionMetadataCatalogEntry],
) -> FunctionCatalogEntry:
"""Insert a new function entry
Expand All @@ -40,7 +59,31 @@ def insert_entry(
"""
function_obj = self.model(name, impl_path, type, checksum)
function_obj = function_obj.save(self.session)
return function_obj.as_dataclass()

for function_io in function_io_list:
function_io.function_id = function_obj._row_id
io_objs = self._function_io_service.create_entries(function_io_list)
for function_metadata in function_metadata_list:
function_metadata.function_id = function_obj._row_id
metadata_objs = self._function_metadata_service.create_entries(
function_metadata_list
)

# atomic operation for adding table and its corresponding columns.
try:
self.session.add_all(io_objs)
self.session.add_all(metadata_objs)
self.session.commit()
except Exception as e:
self.session.rollback()
self.session.delete(function_obj)
self.session.commit()
logger.exception(
f"Failed to insert entry into function catalog with exception {str(e)}"
)
raise CatalogError(e)
else:
return function_obj.as_dataclass()

def get_entry_by_name(self, name: str) -> FunctionCatalogEntry:
"""return the function entry that matches the name provided.
Expand Down
12 changes: 4 additions & 8 deletions evadb/catalog/services/function_io_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,8 @@ def get_output_entries_by_function_id(
logger.error(error)
raise RuntimeError(error)

def insert_entries(self, io_list: List[FunctionIOCatalogEntry]):
"""Commit entries to the function_io table
Arguments:
io_list (List[FunctionIOCatalogEntry]): List of io info io be added
"""

def create_entries(self, io_list: List[FunctionIOCatalogEntry]):
io_objs = []
for io in io_list:
io_obj = FunctionIOCatalog(
name=io.name,
Expand All @@ -86,4 +81,5 @@ def insert_entries(self, io_list: List[FunctionIOCatalogEntry]):
is_input=io.is_input,
function_id=io.function_id,
)
io_obj.save(self.session)
io_objs.append(io_obj)
return io_objs
9 changes: 4 additions & 5 deletions evadb/catalog/services/function_metadata_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@ class FunctionMetadataCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(FunctionMetadataCatalog, db_session)

def insert_entries(self, entries: List[FunctionMetadataCatalogEntry]):
def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
metadata_objs = []
try:
for entry in entries:
metadata_obj = FunctionMetadataCatalog(
key=entry.key, value=entry.value, function_id=entry.function_id
)
metadata_obj.save(self.session)
metadata_objs.append(metadata_obj)
return metadata_objs
except Exception as e:
logger.exception(
f"Failed to insert entry {entry} into function metadata catalog with exception {str(e)}"
)
raise CatalogError(e)

def get_entries_by_function_id(
Expand Down
14 changes: 13 additions & 1 deletion evadb/catalog/services/table_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,20 @@ def insert_entry(
# populate the table_id for all the columns
for column in column_list:
column.table_id = table_catalog_obj._row_id
column_list = self._column_service.insert_entries(column_list)
column_list = self._column_service.create_entries(column_list)

# atomic operation for adding table and its corresponding columns.
try:
self.session.add_all(column_list)
self.session.commit()
except Exception as e:
self.session.rollback()
self.session.delete(table_catalog_obj)
self.session.commit()
logger.exception(
f"Failed to insert entry into table catalog with exception {str(e)}"
)
raise CatalogError(e)
except Exception as e:
logger.exception(
f"Failed to insert entry into table catalog with exception {str(e)}"
Expand Down
3 changes: 3 additions & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
"function_cost_catalog",
"function_metadata_catalog",
]
# Add all keywords that are restricted by EvaDB

RESTRICTED_COL_NAMES = [IDENTIFIER_COLUMN]


class SingletonMeta(type):
Expand Down
9 changes: 9 additions & 0 deletions test/integration_tests/long/test_create_table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ def test_create_table_with_incorrect_info(self):
execute_query_fetch_all(self.evadb, create_table)
execute_query_fetch_all(self.evadb, "DROP TABLE SlackCSV;")

def test_create_table_with_restricted_keywords(self):
create_table = "CREATE TABLE hello (_row_id INTEGER, price TEXT);"
with self.assertRaises(AssertionError):
execute_query_fetch_all(self.evadb, create_table)

create_table = "CREATE TABLE hello2 (_ROW_id INTEGER, price TEXT);"
with self.assertRaises(AssertionError):
execute_query_fetch_all(self.evadb, create_table)


if __name__ == "__main__":
unittest.main()
11 changes: 6 additions & 5 deletions test/unit_tests/catalog/test_catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ def test_insert_function(
function_io_list,
function_metadata_list,
)
functionio_mock.return_value.insert_entries.assert_called_with(function_io_list)
functionmetadata_mock.return_value.insert_entries.assert_called_with(
function_metadata_list
)
function_mock.return_value.insert_entry.assert_called_with(
"function", "sample.py", "classification", checksum_mock.return_value
"function",
"sample.py",
"classification",
checksum_mock.return_value,
function_io_list,
function_metadata_list,
)
checksum_mock.assert_called_with("sample.py")
self.assertEqual(actual, function_mock.return_value.insert_entry.return_value)
Expand Down

0 comments on commit 7d51925

Please sign in to comment.