Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jake python2 demo notebooks #2150

Merged
merged 8 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions demo/web/src/main/notebooks/00 The Deephaven IDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,31 @@ print(type(x))
```
\
\
It is also used for interacting with both static tables.....
It is also used for interacting with both static tables...

```python
from deephaven.TableTools import newTable, stringCol, intCol
static_table = newTable(
stringCol("Name_String_Col", "Data String 1", 'Data String 2', "Data String 3"),
intCol("Name_Int_Col", 44, 55, 66)
)
from deephaven import new_table
from deephaven.column import string_col, int_col

static_table = new_table([
string_col("Name_String_Col", ["Data String 1", 'Data String 2', "Data String 3"]),
int_col("Name_Int_Col", [44, 55, 66])
])
```
\
\
... and dynamically updating ones.

```python
from deephaven.TableTools import timeTable
from deephaven import time_table

import random
updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse()
updating_table = time_table('00:00:00.400').update_view(["Row = i", "Some_Int = (int)(byte)random.randint(0,100)"]).reverse()
```


## These notebooks demonstrate Deephaven differentiators and workflows

(You can find the notebooks also listed at top-right under "File Explorer".)


Expand All @@ -57,7 +62,7 @@ updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (in
3. [Kafka Stream vs Append](03%20Kafka%20Stream%20vs%20Append.md)


Go to [quick start](https://deephaven.io/core/docs/tutorials/quickstart/) to install Deephaven from our pre-built images.
Go to our [Quick start](https://deephaven.io/core/docs/tutorials/quickstart/) guide to install Deephaven from our pre-built images.

Or simply [open the first notebook.](01%20Tables,%20Updates,%20and%20the%20Engine.md)

Expand Down
78 changes: 38 additions & 40 deletions demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ You can quickly see streaming data in a UI and do table operations, interactivel
For example, you can listen to a Kafka stream of cryptocurrency trades sourced from their native exchanges (like the ones below, built using the [XChange library](https://github.com/knowm/XChange)).

```python
from deephaven import ConsumeKafka as ck
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec


def get_trades_stream():
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
return ck.consume(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
key_spec=KeyValueSpec.IGNORE,
value_spec = ck.avro_spec('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')
table_type=TableType.Append)

trades_stream = get_trades_stream()
```
Expand All @@ -40,7 +42,7 @@ Tables and streams are a single abstraction. Event streams, feeds, [soon] CDC, a
You can readily see that your table grows as greater volumes of data are inherited from the Kafka feed.

```python
row_count = trades_stream.countBy("Tot_Rows")
row_count = trades_stream.count_by(col="Tot_Rows")
```
\
\
Expand All @@ -58,8 +60,8 @@ As you might expect, a named table can have multiple dependencies.\
After you run the following command, you'll see that all three of your tables are now updating in lock-step.

```python
row_count_by_instrument = trades_stream.countBy("Tot_Rows", "Instrument")\
.sortDescending("Tot_Rows")
row_count_by_instrument = trades_stream.count_by(col="Tot_Rows", by = ["Instrument"])\
.sort_descending(order_by = ["Tot_Rows"])
```
\
\
Expand All @@ -69,31 +71,30 @@ Below is one way to use a replace() method to swap one for the other.\
To learn more about updateView (or other selection or projection alternatives), refer to [the docs](https://deephaven.io/core/docs/conceptual/choose-select-view-update/).

```python
trades_stream_cleaner = trades_stream.updateView("Instrument = Instrument.replace(`USDT`, `USD`)")
trades_stream_cleaner = trades_stream.update_view(formulas = ["Instrument = Instrument.replace(`USDT`, `USD`)"])

row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument")\
.sortDescending("Tot_Rows")
row_count_by_instrument = trades_stream_cleaner.count_by(col="Tot_Rows", by = ["Instrument"])\
.sort_descending(order_by = ["Tot_Rows"])
```
\
\
\
Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/).

```python
from deephaven import Aggregation as agg, as_list

agg_list = as_list([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
agg.AggAvg("Avg_Size = Size", "Avg_Price = Price"),
agg.AggMin("Low_Price = Price"),
agg.AggMax("High_Price = Price")
])

multi_agg = trades_stream_cleaner.updateView("TimeBin = upperBin(KafkaTimestamp, MINUTE)")\
.aggBy(agg_list, "TimeBin", "Instrument")\
.sortDescending("TimeBin", "Trade_Count")\
.formatColumnWhere("Instrument", "Instrument = `BTC/USD`", "CYAN")
from deephaven import agg as agg

agg_list = [
agg.count_(col = "Trade_Count"),
agg.sum_(cols = ["Total_Size = Size"]),
agg.avg(cols=["Avg_Size = Size", "Avg_Price = Price"]),
agg.min_(cols = ["Low_Price = Price"]),
agg.max_(cols = ["High_Price = Price"])
]

multi_agg = trades_stream_cleaner.update_view(formulas = ["TimeBin = upperBin(KafkaTimestamp, MINUTE)"])\
.agg_by(agg_list, by = ["TimeBin", "Instrument"])\
.sort_descending(order_by = ["TimeBin", "Trade_Count"])
```
\
\
Expand All @@ -102,25 +103,23 @@ Filtering streams is straightforward. One simply uses `where()` to impose a huge

```python
# Filter on a manually-set filter
multi_agg_btc = multi_agg.where("Instrument = `BTC/USD`")
multi_agg_eth = multi_agg.where("Instrument = `ETH/USD`")
multi_agg_btc = multi_agg.where(["Instrument = `BTC/USD`"])
multi_agg_eth = multi_agg.where(["Instrument = `ETH/USD`"])

# Filter on a programatically set criteria
top_instrument = multi_agg.head(1)

multi_agg_row_0 = multi_agg.whereIn(top_instrument, "Instrument")\
.formatColumns("Total_Size = heatmap(Total_Size, 10, 300, MAGENTA, CYAN)")
multi_agg_row_0 = multi_agg.where_in(top_instrument, ["Instrument"])
```
\
\
\
[Joining streams](https://deephaven.io/core/docs/how-to-guides/joins-overview/) is one of Deephaven's superpowers . Deephaven supports both high-performance joins that are (i) relational in nature .......

```python
join_eth_btc = multi_agg_eth.view("TimeBin", "Eth_Avg_Price = Avg_Price")\
.naturalJoin(multi_agg_btc, "TimeBin", "Btc_Avg_Price = Avg_Price")\
.updateView("Ratio_Avg_Prices = Btc_Avg_Price / Eth_Avg_Price")\
.formatColumns("Eth_Avg_Price = Decimal(`#,###.00`)", "Btc_Avg_Price = Decimal(`#,###.00`)")
join_eth_btc = multi_agg_eth.view(["TimeBin", "Eth_Avg_Price = Avg_Price"])\
.natural_join(table=multi_agg_btc, on=["TimeBin"], joins=["Btc_Avg_Price = Avg_Price"])\
.update_view(["Ratio_Avg_Prices = Btc_Avg_Price / Eth_Avg_Price"])
```
\
\
Expand All @@ -132,13 +131,12 @@ join_eth_btc = multi_agg_eth.view("TimeBin", "Eth_Avg_Price = Avg_Price")\
# KafkaTimestamp in the right table (btc_trades).
# If there is no exact nanosecond match, the record with KafkaTimestamp just preceding Eth_Time is used

eth_trades = trades_stream.where("Instrument = `ETH/USD`")
btc_trades = trades_stream.where("Instrument = `BTC/USD`")
eth_trades = trades_stream.where(["Instrument = `ETH/USD`"])
btc_trades = trades_stream.where(["Instrument = `BTC/USD`"])

time_series_join_eth_btc = eth_trades.view("Eth_Time = KafkaTimestamp", "Eth_Price = Price")\
.aj(btc_trades, "Eth_Time = KafkaTimestamp", "Btc_Price = Price, Btc_Time = KafkaTimestamp")\
.updateView("Ratio_Each_Trade = Btc_Price / Eth_Price")\
.formatColumns("Eth_Price = Decimal(`#,###.00`)", "Btc_Price = Decimal(`#,###.00`)")
time_series_join_eth_btc = eth_trades.view(["Eth_Time = KafkaTimestamp", "Eth_Price = Price"])\
.aj(btc_trades, on=["Eth_Time = KafkaTimestamp"],joins=["Btc_Price = Price, Btc_Time = KafkaTimestamp"])\
.update_view(["Ratio_Each_Trade = Btc_Price / Eth_Price"])
```
\
\
Expand Down
61 changes: 31 additions & 30 deletions demo/web/src/main/notebooks/03 Kafka Stream vs Append.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ Start by importing some requisite packages. There is documentation on [installin
[aggBy](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/aggBy/), [emptyTable](https://deephaven.io/core/docs/how-to-guides/empty-table/#related-documentation), and [merge](https://deephaven.io/core/docs/how-to-guides/merge-tables/#merge-tables).

```python
from deephaven import ConsumeKafka as ck, Aggregation as agg, as_list
from deephaven.TableTools import emptyTable, merge
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
from deephaven import empty_table, merge
from deephaven import agg as agg


```

\
Expand All @@ -31,13 +35,13 @@ The feed started on September 10th, 2021. A month later it had ~ 110 million eve
This demo will demonstrate the impact of choices related to `offsets` and `table_type`.

```python
def get_trades(*, offsets, table_type):
return ck.consumeToTable(
def get_trades(*, offsets,table_type):
return ck.consume(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
key_spec=KeyValueSpec.IGNORE,
value_spec = ck.avro_spec('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=offsets,
table_type=table_type)
```
Expand All @@ -50,16 +54,16 @@ In this demo, imagine you want to start your Kafka feed "1 million events ago" (
Create a Deephaven table that listens to current records (-- i.e. crypto trades happening now).

```python
latest_offset = get_trades(offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='stream')
latest_offset = get_trades(offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type=TableType.Stream)
```

\
\
\
You can do a simple [lastBy()](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/lastBy/) to refine the view to only the single last record.
You can do a simple [last_by()](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/lastBy/) to refine the view to only the single last record.

```python
latest_offset = latest_offset.view("KafkaOffset").lastBy()
latest_offset = latest_offset.view(["KafkaOffset"]).last_by()
```

\
Expand All @@ -69,7 +73,7 @@ Since you are targeting a Kafka offset of "1 mm events ago", create a static tab
[Snapshotting](https://deephaven.io/core/docs/how-to-guides/reduce-update-frequency/#create-a-static-snapshot) to an empty table does the trick.

```python
latest_offset = emptyTable(0).snapshot(latest_offset, True)
latest_offset = empty_table(0).snapshot(source_table=latest_offset, do_init=True)
```

\
Expand All @@ -79,8 +83,8 @@ With the static table, you can now set a variable by pulling a record from the t

```python
size_offset = 1_000_000
target_offset_table = latest_offset.view("Offset=max(0, KafkaOffset-size_offset)")
offset_variable = target_offset_table.getColumn("Offset").get(0)
target_offset_table = latest_offset.view(["Offset=max(0, KafkaOffset-size_offset)"])
offset_variable = target_offset_table.j_object.getColumnSource("Offset").get(0)
```

\
Expand All @@ -100,13 +104,12 @@ Define a [table aggregation function](https://deephaven.io/core/docs/reference/t

```python
def trades_agg(table):
agg_list = as_list([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
])
return table.aggBy(agg_list, "Exchange", "Instrument").\
sort("Exchange", "Instrument").\
formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY")
agg_list = [
agg.count_(col = "Trade_Count"),
agg.sum_(cols = ["Total_Size = Size"]),
]
return table.agg_by(agg_list, by=["Exchange", "Instrument"]).\
sort(order_by = ["Exchange", "Instrument"])
```

\
Expand All @@ -120,9 +123,9 @@ Create a new appending table, starting at the `offset_variable` you defined abov
You will see that the count is growing. It should reach > 1mm records quickly.

```python
trades_append = get_trades(offsets={ 0: offset_variable }, table_type='append')
trades_append = get_trades(offsets={ 0: offset_variable }, table_type=TableType.Append)
agg_append = trades_agg(trades_append)
row_count_append = trades_append.countBy("RowCount").updateView("Table_Type = `append`")
row_count_append = trades_append.count_by(col="RowCount").update_view(["Table_Type = `append`"])
```

\
Expand All @@ -134,13 +137,11 @@ Touch the table tab called `agg_append` to see the Trade\_Count and Total\_Size
\
For comparison, repeat the exercise, changing only the `table_type` parameter of the Kafka integration to be _**stream**_ (instead of _**append**_).

Note the `dropStream()` syntax.

```python
trades_stream = get_trades(offsets={ 0: offset_variable }, table_type='stream')
trades_stream = get_trades(offsets={ 0: offset_variable }, table_type=TableType.Stream)
agg_stream = trades_agg(trades_stream)
row_count_stream = trades_stream.dropStream()\
.countBy("RowCount").updateView("Table_Type = `stream`")
row_count_stream = trades_stream.count_by(col="RowCount").update_view(["Table_Type = `stream`"])
```

\
Expand All @@ -158,7 +159,7 @@ However, stream tables have the ingested records in memory only for a split seco
Merging the two row_count tables makes the difference obvious.

```python
row_count_merge = merge(row_count_append, row_count_stream)
row_count_merge = merge([row_count_append, row_count_stream])
```

\
Expand All @@ -167,10 +168,10 @@ row_count_merge = merge(row_count_append, row_count_stream)
You can confirm the two aggregation tables are identical by [joining them](https://deephaven.io/core/docs/reference/table-operations/join/natural-join/) and taking the difference.

```python
diff_table = agg_append.naturalJoin(agg_stream, "Exchange, Instrument",\
"Trade_Count_Agg = Trade_Count, Total_Size_Agg = Total_Size")\
.view("Exchange", "Instrument", "Diff_Trade_Count = Trade_Count - Trade_Count_Agg",
"Diff_Total_Size = Total_Size - Total_Size_Agg")
diff_table = agg_append.natural_join(table =agg_stream, on = ["Exchange, Instrument"],\
joins = ["Trade_Count_Agg = Trade_Count, Total_Size_Agg = Total_Size"])\
.view(["Exchange", "Instrument", "Diff_Trade_Count = Trade_Count - Trade_Count_Agg",
"Diff_Total_Size = Total_Size - Total_Size_Agg"])
```

\
Expand Down
Loading