Skip to content

Commit

Permalink
fix: make the table/function catalog insert operation atomic (georgia…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav274 authored and a0x8o committed Nov 22, 2023
1 parent a561022 commit af9f485
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 0 deletions.
11 changes: 11 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,17 @@ def insert_function_catalog_entry(
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
name,
impl_file_path,
type,
Expand All @@ -609,13 +614,19 @@ def insert_function_catalog_entry(
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
)
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
)
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
return function_entry

def get_function_catalog_entry_by_name(self, name: str) -> FunctionCatalogEntry:
Expand Down
40 changes: 40 additions & 0 deletions evadb/catalog/services/function_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,33 @@
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
from typing import List

=======
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
from typing import List

>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

Expand All @@ -42,12 +51,17 @@
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
from evadb.catalog.models.utils import (
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
Expand All @@ -61,11 +75,15 @@
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
from evadb.catalog.services.base_service import BaseService
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
=======
from evadb.catalog.services.base_service import BaseService
Expand All @@ -76,6 +94,8 @@
from evadb.catalog.services.base_service import BaseService
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
from evadb.utils.logging_manager import logger


Expand All @@ -86,10 +106,13 @@ def __init__(self, db_session: Session):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
self._function_io_service = FunctionIOCatalogService(db_session)
self._function_metadata_service = FunctionMetadataCatalogService(db_session)

Expand All @@ -108,6 +131,9 @@ def insert_entry(
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
self._function_io_service = FunctionIOCatalogService(db_session)
self._function_metadata_service = FunctionMetadataCatalogService(db_session)
Expand All @@ -121,10 +147,13 @@ def insert_entry(
function_io_list: List[FunctionIOCatalogEntry],
function_metadata_list: List[FunctionMetadataCatalogEntry],
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
) -> FunctionCatalogEntry:
"""Insert a new function entry
Expand All @@ -143,12 +172,17 @@ def insert_entry(
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))

for function_io in function_io_list:
function_io.function_id = function_obj._row_id
Expand Down Expand Up @@ -177,11 +211,15 @@ def insert_entry(
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
return function_obj.as_dataclass()
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
=======
return function_obj.as_dataclass()
Expand All @@ -192,6 +230,8 @@ def insert_entry(
return function_obj.as_dataclass()
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))

def get_entry_by_name(self, name: str) -> FunctionCatalogEntry:
"""return the function entry that matches the name provided.
Expand Down
9 changes: 9 additions & 0 deletions evadb/catalog/services/function_io_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ def get_output_entries_by_function_id(
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
def create_entries(self, io_list: List[FunctionIOCatalogEntry]):
io_objs = []
=======
Expand All @@ -90,14 +93,20 @@ def insert_entries(self, io_list: List[FunctionIOCatalogEntry]):
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
def create_entries(self, io_list: List[FunctionIOCatalogEntry]):
io_objs = []
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
for io in io_list:
io_obj = FunctionIOCatalog(
name=io.name,
Expand Down
17 changes: 17 additions & 0 deletions evadb/catalog/services/function_metadata_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,34 @@ def __init__(self, db_session: Session):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
metadata_objs = []
=======
def insert_entries(self, entries: List[FunctionMetadataCatalogEntry]):
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
metadata_objs = []
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
try:
for entry in entries:
metadata_obj = FunctionMetadataCatalog(
Expand All @@ -65,23 +74,31 @@ def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
logger.exception(
f"Failed to insert entry {entry} into function metadata catalog with exception {str(e)}"
)
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
raise CatalogError(e)

def get_entries_by_function_id(
Expand Down
11 changes: 11 additions & 0 deletions test/integration_tests/long/test_create_table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ def test_create_table_with_incorrect_info(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
def test_create_table_with_restricted_keywords(self):
create_table = "CREATE TABLE hello (_row_id INTEGER, price TEXT);"
with self.assertRaises(AssertionError):
Expand All @@ -144,10 +149,14 @@ def test_create_table_with_restricted_keywords(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand All @@ -156,6 +165,8 @@ def test_create_table_with_restricted_keywords(self):
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))

if __name__ == "__main__":
unittest.main()
9 changes: 9 additions & 0 deletions test/unit_tests/catalog/test_catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ def test_insert_function(
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
function_mock.return_value.insert_entry.assert_called_with(
"function",
"sample.py",
Expand All @@ -171,6 +174,9 @@ def test_insert_function(
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
=======
function_mock.return_value.insert_entry.assert_called_with(
"function",
Expand All @@ -180,10 +186,13 @@ def test_insert_function(
function_io_list,
function_metadata_list,
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
<<<<<<< HEAD
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> bc98b4af (fix: make the table/function catalog insert operation atomic (#1293))
)
checksum_mock.assert_called_with("sample.py")
self.assertEqual(actual, function_mock.return_value.insert_entry.return_value)
Expand Down

0 comments on commit af9f485

Please sign in to comment.