Skip to content

Commit

Permalink
allow async parser for docstores, make slideparser async (#7728)
Browse files Browse the repository at this point in the history
Co-authored-by: Kamil Piechowiak <[email protected]>
Co-authored-by: Pawel Podhajski <[email protected]>
GitOrigin-RevId: 12746a34c63f599d970f64be1fdf18b4ac1c310b
  • Loading branch information
3 people authored and Manul from Pathway committed Dec 4, 2024
1 parent 922d5a6 commit a76331e
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `pw.Table.concat`, `pw.Table.with_id`, `pw.Table.with_id_from` no longer perform checks if ids are unique. It improves memory usage.
- table operations that store values (like `pw.Table.join`, `pw.Table.update_cells`) no longer store columns that are not used downstream.
- `append_only` column property is now propagated better (there are more places where we can infer it).
- **BREAKING**: Parsers and parser utilities including `OpenParse`, `ParseUnstructured`, `ParseUtf8`, `parse_images` are now async. Parser interface in the `VectorStore` and `DocumentStore` remains unchanged.
- **BREAKING**: Unused arguments from the constructor `pw.xpacks.llm.question_answering.DeckRetriever` are no longer accepted.

### Fixed
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ tests = [
"dill >= 0.3.8",
"cloudpickle >= 3.0.0",
"deltalake >= 0.17.0, < 0.18.0",
"fpdf",
]
airbyte = [
"google-cloud-run",
Expand Down
12 changes: 6 additions & 6 deletions python/pathway/stdlib/indexing/vector_document_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ def default_lsh_knn_document_index(
Remark: the arbitrarily chosen configuration of the index may change (whenever tests
suggest some better default values). To have fixed configuration, you can use
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parametrized instance of
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parameterized instance of
:py:class:`~pathway.stdlib.indexing.LshKnn`.
Look up :py:class:`~pathway.stdlib.indexing.DataIndex` constructor to see
how to make data index parametrized by custom data structure, and the constructor
how to make data index parameterized by custom data structure, and the constructor
of :py:class:`~pathway.stdlib.indexing.LshKnn` to see
the parameters that can be adjusted.
"""
Expand Down Expand Up @@ -126,10 +126,10 @@ def default_usearch_knn_document_index(
Remark: the arbitrarily chosen configuration of the index may change (whenever tests
suggest some better default values). To have fixed configuration, you can use
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parametrized instance of
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parameterized instance of
:py:class:`~pathway.stdlib.indexing.USearchKnn`.
Look up :py:class:`~pathway.stdlib.indexing.DataIndex` constructor to see how
to make data index parametrized by custom data structure, and the constructor
to make data index parameterized by custom data structure, and the constructor
of :py:class:`~pathway.stdlib.indexing.USearchKnn` to see the
parameters that can be adjusted.
Expand Down Expand Up @@ -172,10 +172,10 @@ def default_brute_force_knn_document_index(
Remark: the arbitrarily chosen configuration of the index may change (whenever tests
suggest some better default values). To have fixed configuration, you can use
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parametrized instance of
:py:class:`~pathway.stdlib.indexing.DataIndex` with a parameterized instance of
:py:class:`~pathway.stdlib.indexing.BruteForceKnn`.
Look up :py:class:`~pathway.stdlib.indexing.DataIndex` constructor to see how
to make data index parametrized by custom data structure, and the constructor
to make data index parameterized by custom data structure, and the constructor
of :py:class:`~pathway.stdlib.indexing.BruteForceKnn` to see the
parameters that can be adjusted.
Expand Down
20 changes: 10 additions & 10 deletions python/pathway/tests/temporal/test_windows_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def generate_expected(duration, hop, delay, cutoff, keep_results, result_table):
return expected_entries


def parametrized_test(duration, hop, delay, cutoff, keep_results):
def parameterized_test(duration, hop, delay, cutoff, keep_results):
result_table = create_windowby_scenario(duration, hop, delay, cutoff, keep_results)
expected = generate_expected(
duration, hop, delay, cutoff, keep_results, result_table
Expand All @@ -289,39 +289,39 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results):


def test_keep_results():
parametrized_test(5, 3, 0, 0, True)
parameterized_test(5, 3, 0, 0, True)


def test_remove_results():
parametrized_test(5, 3, 0, 0, False)
parameterized_test(5, 3, 0, 0, False)


def test_non_zero_delay_keep_results():
parametrized_test(5, 3, 1, 0, True)
parameterized_test(5, 3, 1, 0, True)


def test_non_zero_delay_remove_results():
parametrized_test(5, 3, 1, 0, False)
parameterized_test(5, 3, 1, 0, False)


def test_non_zero_buffer_keep_results():
parametrized_test(5, 3, 0, 1, True)
parameterized_test(5, 3, 0, 1, True)


def test_non_zero_buffer_remove_results():
parametrized_test(5, 3, 0, 1, False)
parameterized_test(5, 3, 0, 1, False)


def test_non_zero_delay_non_zero_buffer_keep_results():
parametrized_test(5, 3, 1, 1, True)
parameterized_test(5, 3, 1, 1, True)


def test_high_delay_high_buffer_keep_results():
parametrized_test(5, 3, 5, 6, True)
parameterized_test(5, 3, 5, 6, True)


def test_non_zero_delay_non_zero_buffer_remove_results():
parametrized_test(5, 3, 1, 1, False)
parameterized_test(5, 3, 1, 1, False)


# method below creates expected output for exactly once tests(also below)
Expand Down
8 changes: 5 additions & 3 deletions python/pathway/xpacks/llm/document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pathway as pw
import pathway.xpacks.llm.parsers
import pathway.xpacks.llm.splitters
from pathway.internals.udfs.utils import coerce_async
from pathway.stdlib.indexing.data_index import _SCORE, DataIndex
from pathway.stdlib.indexing.retrievers import AbstractRetrieverFactory
from pathway.stdlib.ml.classifiers import _knn_lsh
Expand Down Expand Up @@ -227,11 +228,12 @@ def parse_documents(
self, input_docs: pw.Table[_RawDocumentSchema]
) -> pw.Table[_DocumentSchema]:
@pw.udf
def parse_doc(data: bytes, metadata: pw.Json) -> list[dict]:
rets = self.parser(data)
async def parse_doc(data: bytes, metadata: pw.Json) -> list[pw.Json]:
rets = await coerce_async(self.parser)(data)
metadata_dict = metadata.as_dict()
return [
dict(text=ret[0], metadata={**metadata_dict, **ret[1]}) for ret in rets
pw.Json(dict(text=ret[0], metadata={**metadata_dict, **ret[1]}))
for ret in rets
]

return self._apply_processor(input_docs, parse_doc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _table_args_dict_to_model(args_dict: dict) -> BaseModel:
)


class CustomIngestionPipeline(IngestionPipeline):
class SimpleIngestionPipeline(IngestionPipeline):
"""
A simple PDF processing pipeline that combines close elements, combines the headers
with the text body, and removes weirdly formatted/small elements.
Expand Down Expand Up @@ -340,7 +340,7 @@ def ingest(


# modified from https://github.com/Filimoa/open-parse/blob/main/src/openparse/doc_parser.py
class CustomDocumentParser(DocumentParser):
class PyMuDocumentParser(DocumentParser):
def __init__(
self,
*,
Expand Down
99 changes: 53 additions & 46 deletions python/pathway/xpacks/llm/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import subprocess
import tempfile
import warnings
from collections.abc import Callable
from functools import partial
from io import BytesIO
Expand Down Expand Up @@ -53,7 +54,7 @@ class ParseUtf8(pw.UDF):
Decode text encoded as UTF-8.
"""

def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
docs: list[tuple[str, dict]] = [(contents.decode("utf-8"), {})]
return docs

Expand Down Expand Up @@ -128,7 +129,7 @@ def _combine_metadata(self, left: dict, right: dict) -> dict:
result.pop("category_depth", None)
return result

def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
async def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
"""
Parse the given document:
Expand Down Expand Up @@ -256,12 +257,16 @@ class OpenParse(pw.UDF):
- parse_images: whether to parse the images from the PDF. Detected images will be
indexed by their description from the parsing algorithm.
Note that images are parsed with separate OCR model, parsing may take a while.
- processing_pipeline: ``openparse.processing.IngestionPipeline`` that will post process
the extracted elements. Can be set to Pathway defined ``CustomIngestionPipeline``
by setting to ``"pathway_pdf_default"``,
``SamePageIngestionPipeline`` by setting to ``"merge_same_page"``,
or any of the pipelines under the ``openparse.processing``.
Defaults to ``CustomIngestionPipeline``.
- processing_pipeline: str or IngestionPipeline.
Specifies the pipeline used for post-processing extracted elements.
- ``"pathway_pdf_default"``: Uses ``SimpleIngestionPipeline`` from Pathway.
This is a simple processor that combines close elements, combines the headers
with the text body, and removes weirdly formatted/small elements.
Can be set with: ``"pathway_pdf_default"`` or using the class,
``from pathway.xpacks.llm.openparse_utils import SimpleIngestionPipeline``.
- ``"merge_same_page"``: Uses ``SamePageIngestionPipeline`` to chunk based on pages.
- Any other pipeline from the ``openparse.processing`` can also be used.
Defaults to ``SimpleIngestionPipeline``.
- cache_strategy: Defines the caching mechanism. To enable caching,
a valid :py:class:``~pathway.udfs.CacheStrategy`` should be provided.
Defaults to None.
Expand Down Expand Up @@ -296,10 +301,10 @@ def __init__(
import openparse # noqa:F401
from pypdf import PdfReader # noqa:F401

from ._openparse_utils import (
CustomDocumentParser,
CustomIngestionPipeline,
from .openparse_utils import (
PyMuDocumentParser,
SamePageIngestionPipeline,
SimpleIngestionPipeline,
)

super().__init__(cache_strategy=cache_strategy)
Expand All @@ -313,7 +318,7 @@ def __init__(

if parse_images:
if image_args is None:
logger.warn(
warnings.warn(
"`parse_images` is set to `True`, but `image_args` is not specified, defaulting to `gpt-4o`."
)
image_args = {
Expand All @@ -329,26 +334,32 @@ def __init__(
f"Given args: {image_args}",
)
else:
logger.warn(
"`parse_images` is set to `False`, but `image_args` is specified, skipping image parsing."
)
image_args = None
if image_args:
warnings.warn(
"`parse_images` is set to `False`, but `image_args` is specified, skipping image parsing."
)
image_args = None

if processing_pipeline is None:
processing_pipeline = CustomIngestionPipeline()
processing_pipeline = SimpleIngestionPipeline()
elif isinstance(processing_pipeline, str):
if processing_pipeline == "pathway_pdf_default":
processing_pipeline = CustomIngestionPipeline()
processing_pipeline = SimpleIngestionPipeline()
elif processing_pipeline == "merge_same_page":
processing_pipeline = SamePageIngestionPipeline()
else:
raise ValueError(
"Invalid `processing_pipeline` set. It must be either one of \
`'pathway_pdf_default'` or `'merge_same_page'`."
)

self.doc_parser = CustomDocumentParser(
self.doc_parser = PyMuDocumentParser(
table_args=table_args,
image_args=image_args,
processing_pipeline=processing_pipeline,
)

def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
import openparse
from pypdf import PdfReader

Expand Down Expand Up @@ -477,7 +488,7 @@ def __init__(
parse_image_details_fn
)

def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
"""Parse image bytes with GPT-v model."""

images: list[Image.Image] = [Image.open(BytesIO(contents))]
Expand All @@ -489,7 +500,7 @@ def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
for img in images
]

parsed_content, parsed_details = parse_images(
parsed_content, parsed_details = await parse_images(
images,
self.llm,
self.parse_prompt,
Expand Down Expand Up @@ -664,7 +675,7 @@ def __init__(
parse_image_details_fn
)

def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
"""Parse slides with GPT-v model by converting to images."""

from pdf2image import convert_from_bytes
Expand Down Expand Up @@ -694,17 +705,15 @@ def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:

b64_images = [img_to_b64(image) for image in images]

parsed_content, parsed_details = asyncio.run(
_parse_b64_images(
b64_images,
self.llm,
self.parse_prompt,
run_mode=self.run_mode,
parse_details=self.parse_details,
detail_parse_schema=self.detail_parse_schema,
parse_fn=self.parse_fn,
parse_image_details_fn=self.parse_image_details_fn,
)
parsed_content, parsed_details = await _parse_b64_images(
b64_images,
self.llm,
self.parse_prompt,
run_mode=self.run_mode,
parse_details=self.parse_details,
detail_parse_schema=self.detail_parse_schema,
parse_fn=self.parse_fn,
parse_image_details_fn=self.parse_image_details_fn,
)

logger.info(
Expand Down Expand Up @@ -733,7 +742,7 @@ def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
return docs


def parse_images(
async def parse_images(
images: list[Image.Image],
llm: pw.UDF,
parse_prompt: str,
Expand Down Expand Up @@ -767,17 +776,15 @@ def parse_images(

b64_images = [img_to_b64(image) for image in images]

return asyncio.run(
_parse_b64_images(
b64_images,
llm,
parse_prompt,
run_mode=run_mode,
parse_details=parse_details,
detail_parse_schema=detail_parse_schema,
parse_fn=parse_fn,
parse_image_details_fn=parse_image_details_fn,
)
return await _parse_b64_images(
b64_images,
llm,
parse_prompt,
run_mode=run_mode,
parse_details=parse_details,
detail_parse_schema=detail_parse_schema,
parse_fn=parse_fn,
parse_image_details_fn=parse_image_details_fn,
)


Expand Down
Loading

0 comments on commit a76331e

Please sign in to comment.