Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MyPy errors in leveldb #20222

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions airflow/providers/google/leveldb/hooks/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

DB_NOT_INITIALIZED_BEFORE = "The `get_conn` method should be called before!"


class LevelDBHookException(AirflowException):
"""Exception specific for LevelDB"""
Expand All @@ -43,7 +45,7 @@ def __init__(self, leveldb_conn_id: str = default_conn_name):
super().__init__()
self.leveldb_conn_id = leveldb_conn_id
self.connection = self.get_connection(leveldb_conn_id)
self.db = None
self.db: Optional[plyvel.DB] = None

def get_conn(self, name: str = '/tmp/testdb/', create_if_missing: bool = False, **kwargs) -> DB:
"""
Expand Down Expand Up @@ -74,9 +76,9 @@ def run(
self,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
value: Optional[bytes] = None,
keys: Optional[List[bytes]] = None,
values: Optional[List[bytes]] = None,
) -> Optional[bytes]:
"""
Execute operation with leveldb
Expand All @@ -87,21 +89,27 @@ def run(
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:type values: Optional[List[bytes]]
:returns: value from get or None
:rtype: Optional[bytes]
"""
if command == 'put':
if not value:
raise Exception("Please provide `value`!")
return self.put(key, value)
elif command == 'get':
return self.get(key)
elif command == 'delete':
return self.delete(key)
elif command == 'write_batch':
if not keys:
raise Exception("Please provide `keys`!")
if not values:
raise Exception("Please provide `values`!")
return self.write_batch(keys, values)
else:
raise LevelDBHookException("Unknown command for LevelDB hook")
Expand All @@ -115,6 +123,8 @@ def put(self, key: bytes, value: bytes):
:param value: value for put execution e.g. ``b'value'``, ``b'another-value'``
:type value: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.put(key, value)

def get(self, key: bytes) -> bytes:
Expand All @@ -126,6 +136,8 @@ def get(self, key: bytes) -> bytes:
:returns: value of key from db.get
:rtype: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
return self.db.get(key)

def delete(self, key: bytes):
Expand All @@ -135,6 +147,8 @@ def delete(self, key: bytes):
:param key: key for delete execution, e.g. ``b'key'``, ``b'another-key'``
:type key: bytes
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.delete(key)

def write_batch(self, keys: List[bytes], values: List[bytes]):
Expand All @@ -146,6 +160,8 @@ def write_batch(self, keys: List[bytes], values: List[bytes]):
:param values: values for write_batch execution e.g. ``[b'value', b'another-value']``
:type values: List[bytes]
"""
if not self.db:
raise Exception(DB_NOT_INITIALIZED_BEFORE)
with self.db.write_batch() as batch:
for i, key in enumerate(keys):
batch.put(key, values[i])
16 changes: 8 additions & 8 deletions airflow/providers/google/leveldb/operators/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class LevelDBOperator(BaseOperator):
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:type values: Optional[List[bytes]]
:param leveldb_conn_id:
:type leveldb_conn_id: str
:param create_if_missing: whether a new database should be created if needed
Expand All @@ -53,9 +53,9 @@ def __init__(
*,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
value: Optional[bytes] = None,
keys: Optional[List[bytes]] = None,
values: Optional[List[bytes]] = None,
leveldb_conn_id: str = 'leveldb_default',
name: str = '/tmp/testdb/',
create_if_missing: bool = True,
Expand Down Expand Up @@ -94,5 +94,5 @@ def execute(self, context) -> Optional[str]:
)
self.log.info("Done. Returned value was: %s", str(value))
leveldb_hook.close_conn()
value = value if value is None else value.decode()
return value
str_value = value if value is None else value.decode()
return str_value