diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fbc1870..3261cd3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.7.4] - 2023-12-05 + +### Fixed +- Fixed issues with standalone panel+Bokeh dashboards to ensure optimal functionality and performance. + ## [0.7.3] - 2023-11-30 ### Added diff --git a/Cargo.lock b/Cargo.lock index 30c03438..068acd49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,7 @@ dependencies = [ "iana-time-zone", "num-traits", "serde", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -327,9 +327,9 @@ dependencies = [ [[package]] name = "core-foundation" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" dependencies = [ "core-foundation-sys", "libc", @@ -337,9 +337,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" @@ -481,9 +481,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" dependencies = [ "powerfmt", "serde", @@ -604,12 +604,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1121,9 +1121,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" dependencies = [ "wasm-bindgen", ] @@ -1176,9 +1176,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "lock_api" @@ -1271,7 +1271,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1491,7 +1491,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -1505,7 +1505,7 @@ dependencies = [ [[package]] name = "pathway" -version = "0.7.3" +version = "0.7.4" dependencies = [ "arc-swap", "arcstr", @@ -2073,15 +2073,15 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.25" +version = "0.38.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e" +checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" dependencies = [ "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -2096,7 +2096,7 @@ version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2292,7 +2292,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2389,7 +2389,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2515,7 +2515,7 @@ dependencies = [ "mio", "pin-project-lite", "socket2 0.5.5", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2707,9 +2707,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2717,9 +2717,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", @@ -2732,9 +2732,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" dependencies = [ "cfg-if", "js-sys", @@ -2744,9 +2744,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2754,9 +2754,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", @@ -2767,15 +2767,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "web-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" dependencies = [ "js-sys", "wasm-bindgen", @@ -2819,7 +2819,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -2828,7 +2828,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -2837,13 +2846,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -2852,42 +2876,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" @@ -2904,7 +2970,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ "cfg-if", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2915,18 +2981,18 @@ checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" [[package]] name = "zerocopy" -version = "0.7.26" +version = "0.7.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.26" +version = "0.7.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 180801d0..cc496574 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathway" -version = "0.7.3" +version = "0.7.4" edition = "2021" publish = false rust-version = "1.72.0" diff --git a/README.md b/README.md index 374e4bab..483bb1ab 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,8 @@ ![Pathway is an incremental data stream processing engine](https://github.com/pathwaycom/IoT-Pathway/assets/28102878/084593de-9325-4eee-b84a-06b8666efcd1) +In the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading. + Pathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams. With Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time. diff --git a/python/pathway/debug/__init__.py b/python/pathway/debug/__init__.py index 8f90450a..5af2dde6 100644 --- a/python/pathway/debug/__init__.py +++ b/python/pathway/debug/__init__.py @@ -16,6 +16,7 @@ from pathway.internals import Json, api, parse_graph from pathway.internals.datasource import DataSourceOptions, PandasDataSource from pathway.internals.decorators import table_from_datasource +from pathway.internals.fingerprints import fingerprint from pathway.internals.graph_runner import GraphRunner from pathway.internals.monitoring import MonitoringLevel from pathway.internals.runtime_type_check import runtime_type_check @@ -259,7 +260,20 @@ def table_from_pandas( _validate_dataframe(df) - return table_from_datasource( + if id_from is None: + ids_df = pd.DataFrame({"id": df.index}) + ids_df.index = df.index + else: + ids_df = df[id_from].copy() + + for column in api.PANDAS_PSEUDOCOLUMNS: + if column in df.columns: + ids_df[column] = df[column] + + as_hashes = [fingerprint(x) for x in ids_df.to_dict(orient="records")] + key = fingerprint((unsafe_trusted_ids, sorted(as_hashes))) + + ret: Table = table_from_datasource( PandasDataSource( schema=schema, data=df.copy(), @@ -268,6 +282,14 @@ def table_from_pandas( ), ) ) + from pathway.internals.parse_graph import G + + if key in G.static_tables_cache: + ret = ret.with_universe_of(G.static_tables_cache[key]) + else: + G.static_tables_cache[key] = ret + + return ret def _markdown_to_pandas(table_def): diff --git a/python/pathway/internals/parse_graph.py b/python/pathway/internals/parse_graph.py index b390a464..4bcf5f22 100644 --- a/python/pathway/internals/parse_graph.py +++ b/python/pathway/internals/parse_graph.py @@ -5,12 +5,15 @@ import hashlib import itertools from collections.abc import Callable -from typing import Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar from pathway.internals import operator, trace from pathway.internals.helpers import FunctionSpec, StableSet from pathway.internals.universe_solver import UniverseSolver +if TYPE_CHECKING: + from pathway.internals import Table + class Scope: """Keeps all nodes of one scope.""" @@ -64,6 +67,7 @@ class ParseGraph: _scope_stack: list[Scope] universe_solver: UniverseSolver cache: dict[Any, Any] + static_tables_cache: dict[int, Table] def __init__(self) -> None: self.clear() @@ -129,6 +133,7 @@ def clear(self): self.scopes = [global_scope] self.universe_solver = UniverseSolver() self.cache = {} + self.static_tables_cache = {} def sig(self): return hashlib.sha256(repr(self).encode()).hexdigest() diff --git a/python/pathway/internals/table.py b/python/pathway/internals/table.py index 70f52340..01186f8b 100644 --- a/python/pathway/internals/table.py +++ b/python/pathway/internals/table.py @@ -438,7 +438,7 @@ def __add__(self, other: Table) -> Table: ... age ... 1 10 ... 7 3 - ... ''').with_universe_of(t1) + ... ''') >>> t3 = t1 + t2 >>> pw.debug.compute_and_print(t3, include_id=False) pet | age @@ -1319,7 +1319,7 @@ def with_columns(self, *args: expr.ColumnReference, **kwargs: Any) -> Table: ... 1 | Tom | 1 | 10 ... 2 | Bob | 1 | 9 ... 3 | Tom | 2 | 8 - ... ''').with_universe_of(t1) + ... ''') >>> t3 = t1.with_columns(*t2) >>> pw.debug.compute_and_print(t3, include_id=False) age | owner | pet | size @@ -1720,9 +1720,11 @@ def with_universe_of(self, other: TableLike) -> Table: ... | age ... 1 | 10 ... 7 | 3 - ... ''').with_universe_of(t1) - >>> t3 = t1 + t2 - >>> pw.debug.compute_and_print(t3, include_id=False) + ... 8 | 100 + ... ''') + >>> t3 = t2.filter(pw.this.age < 30).with_universe_of(t1) + >>> t4 = t1 + t3 + >>> pw.debug.compute_and_print(t4, include_id=False) pet | age Cat | 3 Dog | 10 diff --git a/python/pathway/internals/table_like.py b/python/pathway/internals/table_like.py index 323a1a70..23d89590 100644 --- a/python/pathway/internals/table_like.py +++ b/python/pathway/internals/table_like.py @@ -146,8 +146,9 @@ def promise_universe_is_equal_to( ... 1 | 8 | Alice | cat ... 2 | 9 | Bob | dog ... 3 | 15 | Alice | tortoise + ... 4 | 99 | Bob | seahorse ... ''' - ... ) + ... ).filter(pw.this.age<30) >>> t2 = pw.debug.table_from_markdown( ... ''' ... | age | owner diff --git a/python/pathway/internals/universes.py b/python/pathway/internals/universes.py index 520efa57..cf793123 100644 --- a/python/pathway/internals/universes.py +++ b/python/pathway/internals/universes.py @@ -107,8 +107,9 @@ def promise_are_equal(self: TableLike, *others: TableLike) -> None: ... 1 | 11 | Alice ... 2 | 12 | Tom ... 3 | 7 | Eve + ... 4 | 99 | Papa ... ''' - ... ) + ... ).filter(pw.this.age<20) >>> t3 = t2.filter(pw.this.age > 10) >>> with pytest.raises( ... ValueError, diff --git a/python/pathway/io/python/__init__.py b/python/pathway/io/python/__init__.py index 62bb4a72..61b2c127 100644 --- a/python/pathway/io/python/__init__.py +++ b/python/pathway/io/python/__init__.py @@ -1,11 +1,16 @@ # Copyright © 2023 Pathway - import json +import queue import threading +import time from abc import ABC, abstractmethod from queue import Queue from typing import Any +import pandas as pd +import panel as pn +from IPython.display import display + from pathway.internals import Table, api, datasource from pathway.internals.api import DataEventType, PathwayType, Pointer, SessionType from pathway.internals.decorators import table_from_datasource @@ -236,3 +241,59 @@ def read( ), debug_datasource=datasource.debug_datasource(debug_data), ) + + +class InteractiveCsvPlayer(ConnectorSubject): + q: queue.Queue + + def __init__(self, csv_file="") -> None: + super().__init__() + self.q = queue.Queue() + + self.df = pd.read_csv(csv_file) + + state = pn.widgets.Spinner(value=0, width=0) + int_slider = pn.widgets.IntSlider( + name="Row position in csv", + start=0, + end=len(self.df), + step=1, + value=0, + disabled=True, + ) + int_slider.jscallback( + value=""" + if (int_slider.value < state.value) + int_slider.value = state.value + """, + args={"int_slider": int_slider, "state": state}, + ) + + def updatecallback(target, event): + if event.new > event.old: + target.value = event.new + self.q.put_nowait(target.value) + + int_slider.link(state, callbacks={"value": updatecallback}) + + self.state = state + self.int_slider = int_slider + display(pn.Row(state, int_slider, f"{len(self.df)} rows in csv")) + + def run(self): + last_streamed_idx = -1 + while True: + try: + new_pos = self.q.get() + for i in range(last_streamed_idx + 1, new_pos): + self.next_json(self.df.iloc[i].to_dict()) + last_streamed_idx = new_pos - 1 + if new_pos == len(self.df): + break + except queue.Empty: + pass + time.sleep(0.1) + self.close() + + def on_stop(self) -> None: + self.int_slider.disabled = True diff --git a/python/pathway/stdlib/viz/plotting.py b/python/pathway/stdlib/viz/plotting.py index ce0905d9..221d2771 100644 --- a/python/pathway/stdlib/viz/plotting.py +++ b/python/pathway/stdlib/viz/plotting.py @@ -15,6 +15,20 @@ from pathway.internals.trace import trace_user_frame +# after: https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook +def _in_notebook(): + try: + from IPython import get_ipython # noqa + + if "IPKernelApp" not in get_ipython().config: # noqa + return False + except ImportError: + return False + except AttributeError: + return False + return True + + @runtime_type_check @trace_user_frame def plot( @@ -82,20 +96,33 @@ def plot( else: integrated = {} + in_notebook = _in_notebook() + def _update(key, row, time, is_addition): if is_addition: integrated[key] = row else: del integrated[key] - df = pd.DataFrame.from_dict(integrated, orient="index") + df = pd.DataFrame.from_dict(integrated, orient="index", columns=col_names) if sorting_col: df = df.sort_values(sorting_col) else: df = df.sort_index() df = df.reset_index(drop=True) - source.stream(df.to_dict("list"), rollover=len(df)) # type:ignore[arg-type] - pn.io.push_notebook(viz) + if in_notebook: + source.stream( + df.to_dict("list"), rollover=len(df) # type:ignore[arg-type] + ) + pn.io.push_notebook(viz) + else: + if plot.document is not None: + plot.document.add_next_tick_callback( + lambda: source.stream( + df.to_dict("list"), # type:ignore[arg-type] + rollover=len(df), + ) + ) internal_subscribe(self, on_change=_update, skip_persisted_batch=True) diff --git a/python/pathway/stdlib/viz/table_viz.py b/python/pathway/stdlib/viz/table_viz.py index dbdacf10..bc127b68 100644 --- a/python/pathway/stdlib/viz/table_viz.py +++ b/python/pathway/stdlib/viz/table_viz.py @@ -122,7 +122,9 @@ def update(key, row, time, is_addition): else: del integrated[key] df = ( - pd.DataFrame.from_dict(integrated, orient="index") + pd.DataFrame.from_dict( + integrated, orient="index", columns=col_names + ) .sort_index() .reset_index(drop=True) ) diff --git a/python/pathway/tests/ml/test_index.py b/python/pathway/tests/ml/test_index.py index 6cb5ee6f..207f74e9 100644 --- a/python/pathway/tests/ml/test_index.py +++ b/python/pathway/tests/ml/test_index.py @@ -58,9 +58,9 @@ def test_all_at_once(): points = table.filter(~pw.this.is_query).without(pw.this.is_query) queries = table.filter(pw.this.is_query).without(pw.this.is_query) index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5) - result = queries + index.get_nearest_items(queries.coords, k=2).with_universe_of( - queries - ).select(nn=pw.apply(sort_arrays, pw.this.coords)) + result = queries + index.get_nearest_items(queries.coords, k=2).select( + nn=pw.apply(sort_arrays, pw.this.coords) + ) expected = nn_as_table( [ ((0, 0), ((-1, 0), (1, 2))), @@ -101,9 +101,9 @@ def stream_points(with_k: bool = False) -> tuple[pw.Table, pw.Table]: def test_update_old(): points, queries = stream_points() index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5) - result = queries + index.get_nearest_items(queries.coords, k=2).with_universe_of( - queries - ).select(nn=pw.apply(sort_arrays, pw.this.coords)) + result = queries + index.get_nearest_items(queries.coords, k=2).select( + nn=pw.apply(sort_arrays, pw.this.coords) + ) expected = nn_as_table( [ ((0, 0), ((-1, 0), (1, 2))), diff --git a/python/pathway/tests/test_column_properties.py b/python/pathway/tests/test_column_properties.py index 8f959d93..cb0573a4 100644 --- a/python/pathway/tests/test_column_properties.py +++ b/python/pathway/tests/test_column_properties.py @@ -23,7 +23,7 @@ def test_preserve_dependency_properties(): | b 1 | 42 """, - ).with_universe_of(input1) + ) input3 = T( """ | c @@ -33,7 +33,7 @@ def test_preserve_dependency_properties(): {"c": pw.column_definition(dtype=int)}, properties=pw.SchemaProperties(append_only=False), ), - ).with_universe_of(input1) + ) result = input1.select(a=input1.a, b=input1.a + input2.b, c=input1.a + input3.c) @@ -54,7 +54,7 @@ def test_preserve_context_dependency_properties(): | b 1 | 42 """, - ).with_universe_of(input1) + ) input3 = T( """ | c @@ -64,7 +64,7 @@ def test_preserve_context_dependency_properties(): {"c": pw.column_definition(dtype=int)}, properties=pw.SchemaProperties(append_only=False), ), - ).with_universe_of(input1) + ) res1 = input1.filter(pw.this.a == input2.b) res2 = input1.filter(pw.this.a == input3.c) diff --git a/python/pathway/tests/test_common.py b/python/pathway/tests/test_common.py index 5ab65b53..50b9f635 100644 --- a/python/pathway/tests/test_common.py +++ b/python/pathway/tests/test_common.py @@ -105,7 +105,7 @@ def test_select_column_ref(): 2 | 2 26 | 26 """ - ).with_universe_of(t_latin) + ) res = t_latin.select(num=t_num.num, upper=t_latin["upper"]) @@ -1124,7 +1124,7 @@ def test_from_columns(): b | 80 | c c | 90 | b """ - ).with_universe_of(first) + ) expected = T( """ pet | foo @@ -1306,7 +1306,7 @@ def test_filter(): 2 | True 26 | False """ - ).with_universe_of(t_latin) + ) res = t_latin.filter(t_tmp["bool"]) @@ -1738,7 +1738,7 @@ def test_apply_more_args(): -1 4 """ - ).with_universe_of(a) + ) def add(x: int, y: int) -> int: return x + y @@ -1775,7 +1775,7 @@ def test_numba_apply(): -1 4 """, - ).with_universe_of(a) + ) def add(x, y): return x + y @@ -1815,7 +1815,7 @@ def test_numba_apply_lambda(): -1 4 """, - ).with_universe_of(a) + ) expression = pw.numba_apply(lambda x, y: x + y, "int64(int64,int64)", a.foo, b.bar) @@ -1852,7 +1852,7 @@ def test_numba_composite(): -1 4 """, - ).with_universe_of(a) + ) result = a.select( ret=pw.numba_apply(lambda x, y: x + y, "int64(int64,int64)", a.foo - 1, 1) @@ -2399,8 +2399,6 @@ def test_join_swapped_condition(): 3 | 1 | Tom | 8 | XL """ ) - # ensure we are not testing case with completely messed up universes - t1.with_universe_of(t2) with pytest.raises(ValueError): t1.join(t2, t2.pet == t1.pet).select( owner_name=t2.owner, L=t1.id, R=t2.id, age=t1.age @@ -3500,7 +3498,7 @@ def test_update_cells(): 3 | 2 | Alice | 8 4 | 1 | Eve | 3 """ - ).with_universe_of(old) + ) pw.universes.promise_is_subset_of(update, old) new = old.update_cells(update) @@ -3519,12 +3517,11 @@ def test_update_cells_0_rows(): | owner | age """ ) - pw.universes.promise_is_subset_of(update, old) expected = T( """ | pet | owner | age """ - ).with_universe_of(old) + ) assert_table_equality(old.update_cells(update), expected) assert_table_equality(old << update, expected) @@ -3564,14 +3561,14 @@ def test_update_cells_warns_when_using_with_columns(): 1 | Eve | 10 4 | Eve | 3 """ - ).with_universe_of(old) + ) expected = T( """ | pet | owner | age 1 | 1 | Eve | 10 4 | 1 | Eve | 3 """ - ).with_universe_of(old) + ) with pytest.warns( UserWarning, @@ -3769,7 +3766,7 @@ def test_with_columns(): 2 | Eve | 10 | 11 3 | Eve | 15 | 13 """ - ).with_universe_of(old) + ) expected = T( """ | pet | owner | age | weight @@ -3777,7 +3774,7 @@ def test_with_columns(): 2 | 1 | Eve | 10 | 11 3 | 2 | Eve | 15 | 13 """ - ).with_universe_of(old) + ) new = old.with_columns(*update) assert_table_equality(new, expected) @@ -3793,12 +3790,12 @@ def test_with_columns_0_rows(): """ | owner | age | weight """ - ).with_universe_of(old) + ) expected = T( """ | pet | owner | age | weight """ - ).with_universe_of(old) + ) assert_table_equality(old.with_columns(**update), expected) @@ -4010,7 +4007,7 @@ def test_wildcard_basic_usage(): | c | d 1 | 3 | 4 """ - ).with_universe_of(tab1) + ) left = tab1.select(*tab1, *tab2) diff --git a/python/pathway/tests/test_transformers.py b/python/pathway/tests/test_transformers.py index 024c794f..bbcecb46 100644 --- a/python/pathway/tests/test_transformers.py +++ b/python/pathway/tests/test_transformers.py @@ -441,7 +441,7 @@ def result(self) -> int: 2 | 5 3 | 6 """ - ).with_universe_of(input1) + ) method_table: Table = foo_transformer(input1).table @@ -920,8 +920,6 @@ def test_twoclass_method(): 8 | 3 | 14 """ ).with_columns(h=hypers.pointer_from(this.h)) - method_table = method_table.with_universe_of(tablec_constants) - method_table2 = method_table2.with_universe_of(tablec_constants) tablec = tablec_constants.select( a=tablec_constants.a, diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index dd0546c1..8ef810e0 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -482,6 +482,10 @@ impl Table { self.data.arranged() } + fn values_consolidated(&self) -> &Values { + self.data.consolidated() + } + fn keys(&self) -> &Keys { self.data.keys() } @@ -1106,7 +1110,7 @@ impl DataflowGraphInner { let error_reporter = self.error_reporter.clone(); - let new_values = table.values().consolidate_for_output().map_wrapped_named( + let new_values = table.values_consolidated().map_wrapped_named( "expression_table::evaluate_expression", wrapper, move |(key, values)| { diff --git a/src/engine/value.rs b/src/engine/value.rs index 046d4616..8c347bbe 100644 --- a/src/engine/value.rs +++ b/src/engine/value.rs @@ -16,7 +16,8 @@ use itertools::Itertools as _; use ndarray::ArrayD; use ordered_float::OrderedFloat; use rand::Rng; -use serde::{Deserialize, Serialize}; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value as JsonValue; use xxhash_rust::xxh3::Xxh3 as Hasher; @@ -142,6 +143,40 @@ impl Handle { } } +fn serialize_json(json: &JsonValue, s: S) -> Result +where + S: Serializer, +{ + s.serialize_str(&json.to_string()) +} + +struct JsonVisitor; + +impl<'de> Visitor<'de> for JsonVisitor { + type Value = Handle; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("A String containing a serialized JSON.") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + match serde_json::from_str(v) { + Ok(json) => Ok(Handle::new(json)), + Err(err) => Err(serde::de::Error::custom(err)), + } + } +} + +fn deserialize_json<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + d.deserialize_str(JsonVisitor) +} + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub enum Value { None, @@ -157,6 +192,10 @@ pub enum Value { DateTimeNaive(DateTimeNaive), DateTimeUtc(DateTimeUtc), Duration(Duration), + #[serde( + serialize_with = "serialize_json", + deserialize_with = "deserialize_json" + )] Json(Handle), }