Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Using BatchStatement instead of execute_concurrent_with_args #163

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

EXPEbdodla
Copy link
Collaborator

@EXPEbdodla EXPEbdodla commented Jan 6, 2025

What this PR does / why we need it:

fix: Using BatchStatement instead of execute_concurrent_with_args

Which issue(s) this PR fixes:

  1. execute_concurrent_with_args taking longer to inserts records. Using BatchStatement to write all records specific to an entity_key as a batch. This should avoid the network time. If we group different entity_keys in a single batch, it will run as BatchType.LOGGED mode. Based on the docs, it has performance impact. So using BatchType.UNLOGGED mode and batching on partition key.
  2. Concurrency is managed using Queues
  3. Setting TTL at row level (or during insert) instead of Table level
  4. Using rate limiting to control the writes. Noticed an impact to read performance when the materializations run. With this we can control the write throughput and reduce the impact on reads
  5. Removed Expedia specific spark_kafka_processor.py code because we are upgrading materialization process to Spark 3.5 and also noticed an increased write latency for streaming ingestion tasks using mapInPandas()
  6. Using feature view tags with prefix online_store_ to override online store configurations. Not all online store configurations are overridden.

Misc

Comment on lines 505 to 534
futures = []
for batch in batches:
futures.append(session.execute_async(batch))
if len(futures) >= config.online_store.write_concurrency:
# Raises exception if at least one of the batch fails
try:
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc

if len(futures) > 0:
try:
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc

# execute_concurrent_with_args(
# session,
# insert_cql,
# rows,
# concurrency=config.online_store.write_concurrency,
# )
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer allows for write_concurrency to be set in the feature_store.yaml then, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using that. Refer Line feast-dev#508

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, missed that. thanks

@EXPEbdodla EXPEbdodla force-pushed the use_batch branch 3 times, most recently from 01e6130 to 075c4c0 Compare January 15, 2025 04:50
@EXPEbdodla EXPEbdodla force-pushed the use_batch branch 2 times, most recently from a0d66f2 to c535a17 Compare February 6, 2025 23:36
@EXPEbdodla EXPEbdodla changed the title fix: Trying BatchStatement instead of execute_concurrent_with_args fix: Using BatchStatement instead of execute_concurrent_with_args Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants