Skip to content

Commit

Permalink
Remove use of pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Jul 9, 2024
1 parent 8d2c997 commit 664e185
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 125 deletions.
85 changes: 1 addition & 84 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ gunicorn = "^22.0.0"
uvicorn = {extras = ["standard"], version = ">=0.24.0"}
sdss-clu = "^2.2.1"
influxdb-client = {extras = ["async"], version = "^1.38.0"}
pandas = "^2.1.3"
slack-sdk = "^3.23.0"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
passlib = {extras = ["bcrypt"], version = "^1.7.4"}
Expand Down
41 changes: 24 additions & 17 deletions src/lvmapi/routers/spectrographs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from typing import get_args

import pandas
import polars
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel

Expand All @@ -27,7 +27,7 @@

class SplitDataFrameToDict(BaseModel):
columns: list[str]
data: list[list]
data: list[tuple]


router = APIRouter(prefix="/spectrographs", tags=["spectrographs"])
Expand Down Expand Up @@ -81,33 +81,40 @@ async def get_temperatures(
if len(results) == 0:
return {"columns": ["time", "camera", "sensor", "temperature"], "data": []}

results.loc[:, "time"] = results["_time"].map(lambda tt: tt.isoformat())
results.loc[:, "camera"] = pandas.Series("", dtype="S3")
results.loc[:, "sensor"] = pandas.Series("", dtype="S3")
results.loc[:, "temperature"] = results._value
final_df = results.select(
time=polars.col._time.dt.to_string("%Y-%m-%dT%H:%M:%S.%3f"),
camera=polars.lit(None, dtype=polars.String),
sensor=polars.lit(None, dtype=polars.String),
temperature=polars.col._value.cast(polars.Float32),
)

for spec in ["sp1", "sp2", "sp3"]:
for cc in get_args(Cameras):
for ss in get_args(Sensors):
label = get_spectrograph_temperature_label(cc, ss)

results.loc[
(results._measurement == f"lvmscp.{spec}")
& (results._field == f"status.{label}"),
_measurement = f"lvmscp.{spec}"
_field = f"status.{label}"

final_df[
(
(results["_measurement"] == _measurement)
& (results["_field"] == _field)
).arg_true(),
"camera",
] = f"{cc}{spec[-1]}"

results.loc[results._field == f"status.{label}", "sensor"] = ss
final_df[(results["_field"] == _field).arg_true(), "sensor"] = ss

results = results.loc[:, ["time", "camera", "sensor", "temperature"]]
final_df = final_df.select(polars.col(["time", "camera", "sensor", "temperature"]))

if camera:
results = results.loc[results.camera == camera, :]
final_df = final_df.filter(polars.col.camera == camera)

if sensor:
results = results.loc[results.sensor == sensor, :]
final_df = final_df.filter(polars.col.sensor == sensor)

return results.to_dict(orient="split", index=False)
return {"columns": final_df.columns, "data": final_df.rows()}


@router.get("/thermistors")
Expand All @@ -124,10 +131,10 @@ async def get_thermistors(

data = await read_thermistors(interval=interval)

if isinstance(data, pandas.DataFrame):
if isinstance(data, polars.DataFrame):
if thermistor is not None:
data = data.loc[data.channel == thermistor.lower(), :]
return data.to_dict(orient="split", index=False)
data = data.filter(polars.col.channel == thermistor)
return {"columns": data.columns, "data": data.rows()}

if thermistor:
return data[thermistor]
Expand Down
19 changes: 11 additions & 8 deletions src/lvmapi/tools/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,20 @@

from __future__ import annotations

import json
import os
import warnings

from typing import TYPE_CHECKING
import polars

from lvmapi import config


if TYPE_CHECKING:
import pandas


__all__ = ["query_influxdb"]


async def query_influxdb(query: str) -> pandas.DataFrame:
"""Runs a query in InfluxDB and returns a Pandas dataframe."""
async def query_influxdb(query: str) -> polars.DataFrame:
"""Runs a query in InfluxDB and returns a Polars dataframe."""

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.warnings import MissingPivotFunction
Expand All @@ -47,4 +44,10 @@ async def query_influxdb(query: str) -> pandas.DataFrame:
raise RuntimeError("InfluxDB client failed to connect.")

api = client.query_api()
return await api.query_data_frame(query)

query_results = await api.query(query)

df = polars.DataFrame(json.loads(query_results.to_json()))
df = df.with_columns(polars.col._time.cast(polars.Datetime("ms")))

return df
26 changes: 11 additions & 15 deletions src/lvmapi/tools/spectrograph.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from typing import TYPE_CHECKING, overload

import pandas
import polars

from lvmapi.tools import CluClient
from lvmapi.tools.influxdb import query_influxdb
Expand Down Expand Up @@ -132,7 +132,7 @@ async def get_spectrograph_mechanics(spec: Spectrographs):


@overload
async def read_thermistors(interval: float) -> pandas.DataFrame: ...
async def read_thermistors(interval: float) -> polars.DataFrame: ...


@overload
Expand All @@ -141,7 +141,7 @@ async def read_thermistors(interval: None) -> dict[str, bool]: ...

async def read_thermistors(
interval: float | None = None,
) -> pandas.DataFrame | dict[str, bool]:
) -> polars.DataFrame | dict[str, bool]:
"""Returns thermistor states from InfluxDB.
Parameters
Expand All @@ -153,7 +153,7 @@ async def read_thermistors(
-------
states
If ``interval=None``, a dictionary of thermistor states. Otherwise a
Pandas dataframe with the thermistor states, one row per measurement
Polars dataframe with the thermistor states, one row per measurement
within the interval.
"""
Expand All @@ -179,19 +179,15 @@ async def read_thermistors(

if interval is None:
result: dict[str, bool] = {}
for _, row in data.iterrows():
result[row.channel_name] = bool(row._value)
for row in data.iter_rows(named=True):
result[row["channel_name"]] = bool(row["_value"])
return result

df = pandas.DataFrame(
{
"time": data._time,
"channel": data.channel_name,
"state": pandas.Series(data._value, dtype="int8"),
}
df = data.select(
time=polars.col._time,
channel=polars.col.channel_name,
state=polars.col._value.cast(polars.Boolean),
)
df.sort_values(["channel", "time"], inplace=True)
df.reset_index(drop=True, inplace=True)
df["time"] = pandas.to_datetime(df["time"])
df = df.sort(["channel", "time"])

return df

0 comments on commit 664e185

Please sign in to comment.