Skip to content

Commit

Permalink
fix: make the table/function catalog insert operation atomic (georgia…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav274 authored and a0x8o committed Oct 30, 2023
1 parent 8a8a90a commit bc98b4a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 0 deletions.
7 changes: 7 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,16 @@ def insert_function_catalog_entry(
checksum = get_file_checksum(impl_file_path)
function_entry = self._function_service.insert_entry(
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
name,
impl_file_path,
type,
checksum,
function_io_list,
function_metadata_list,
<<<<<<< HEAD
)
=======
name, impl_file_path, type, checksum
Expand All @@ -380,6 +384,9 @@ def insert_function_catalog_entry(
function_metadata.function_id = function_entry.row_id
self._function_metadata_service.insert_entries(function_metadata_list)
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
)
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
return function_entry

def get_function_catalog_entry_by_name(self, name: str) -> FunctionCatalogEntry:
Expand Down
31 changes: 31 additions & 0 deletions evadb/catalog/services/function_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
<<<<<<< HEAD
<<<<<<< HEAD
from typing import List

=======
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
from typing import List

>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

from evadb.catalog.models.function_catalog import FunctionCatalog, FunctionCatalogEntry
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
from evadb.catalog.models.utils import (
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
Expand All @@ -32,15 +40,19 @@
FunctionMetadataCatalogService,
)
from evadb.utils.errors import CatalogError
<<<<<<< HEAD
=======
from evadb.catalog.services.base_service import BaseService
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
from evadb.utils.logging_manager import logger


class FunctionCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(FunctionCatalog, db_session)
<<<<<<< HEAD
<<<<<<< HEAD
self._function_io_service = FunctionIOCatalogService(db_session)
self._function_metadata_service = FunctionMetadataCatalogService(db_session)
Expand All @@ -58,6 +70,19 @@ def insert_entry(
def insert_entry(
self, name: str, impl_path: str, type: str, checksum: str
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
self._function_io_service = FunctionIOCatalogService(db_session)
self._function_metadata_service = FunctionMetadataCatalogService(db_session)

def insert_entry(
self,
name: str,
impl_path: str,
type: str,
checksum: str,
function_io_list: List[FunctionIOCatalogEntry],
function_metadata_list: List[FunctionMetadataCatalogEntry],
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
) -> FunctionCatalogEntry:
"""Insert a new function entry
Expand All @@ -73,6 +98,9 @@ def insert_entry(
function_obj = self.model(name, impl_path, type, checksum)
function_obj = function_obj.save(self.session)
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))

for function_io in function_io_list:
function_io.function_id = function_obj._row_id
Expand All @@ -98,9 +126,12 @@ def insert_entry(
raise CatalogError(e)
else:
return function_obj.as_dataclass()
<<<<<<< HEAD
=======
return function_obj.as_dataclass()
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))

def get_entry_by_name(self, name: str) -> FunctionCatalogEntry:
"""return the function entry that matches the name provided.
Expand Down
5 changes: 5 additions & 0 deletions evadb/catalog/services/function_io_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def get_output_entries_by_function_id(
logger.error(error)
raise RuntimeError(error)

<<<<<<< HEAD
<<<<<<< HEAD
def create_entries(self, io_list: List[FunctionIOCatalogEntry]):
io_objs = []
Expand All @@ -81,6 +82,10 @@ def insert_entries(self, io_list: List[FunctionIOCatalogEntry]):
"""

>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
def create_entries(self, io_list: List[FunctionIOCatalogEntry]):
io_objs = []
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
for io in io_list:
io_obj = FunctionIOCatalog(
name=io.name,
Expand Down
8 changes: 8 additions & 0 deletions evadb/catalog/services/function_metadata_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ class FunctionMetadataCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(FunctionMetadataCatalog, db_session)

<<<<<<< HEAD
<<<<<<< HEAD
def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
metadata_objs = []
=======
def insert_entries(self, entries: List[FunctionMetadataCatalogEntry]):
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
def create_entries(self, entries: List[FunctionMetadataCatalogEntry]):
metadata_objs = []
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
try:
for entry in entries:
metadata_obj = FunctionMetadataCatalog(
Expand All @@ -45,11 +50,14 @@ def insert_entries(self, entries: List[FunctionMetadataCatalogEntry]):
return metadata_objs
except Exception as e:
<<<<<<< HEAD
<<<<<<< HEAD
=======
logger.exception(
f"Failed to insert entry {entry} into function metadata catalog with exception {str(e)}"
)
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
raise CatalogError(e)

def get_entries_by_function_id(
Expand Down
6 changes: 6 additions & 0 deletions test/integration_tests/long/test_create_table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def test_create_table_with_incorrect_info(self):
execute_query_fetch_all(self.evadb, "DROP TABLE SlackCSV;")

<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
def test_create_table_with_restricted_keywords(self):
create_table = "CREATE TABLE hello (_row_id INTEGER, price TEXT);"
with self.assertRaises(AssertionError):
Expand All @@ -132,8 +135,11 @@ def test_create_table_with_restricted_keywords(self):
with self.assertRaises(AssertionError):
execute_query_fetch_all(self.evadb, create_table)

<<<<<<< HEAD
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))

if __name__ == "__main__":
unittest.main()
10 changes: 10 additions & 0 deletions test/unit_tests/catalog/test_catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def test_insert_function(
function_io_list,
function_metadata_list,
)
<<<<<<< HEAD
<<<<<<< HEAD
function_mock.return_value.insert_entry.assert_called_with(
"function",
Expand All @@ -158,6 +159,15 @@ def test_insert_function(
function_mock.return_value.insert_entry.assert_called_with(
"function", "sample.py", "classification", checksum_mock.return_value
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
function_mock.return_value.insert_entry.assert_called_with(
"function",
"sample.py",
"classification",
checksum_mock.return_value,
function_io_list,
function_metadata_list,
>>>>>>> d4c650b6 (fix: make the table/function catalog insert operation atomic (#1293))
)
checksum_mock.assert_called_with("sample.py")
self.assertEqual(actual, function_mock.return_value.insert_entry.return_value)
Expand Down

0 comments on commit bc98b4a

Please sign in to comment.