Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced User Controls and Proper Path Validation #6098

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions ee/clickhouse/queries/paths/paths.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from re import escape
from typing import Dict, List, Literal, Optional, Tuple, Union, cast

Expand All @@ -7,7 +8,7 @@
from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelPersons
from ee.clickhouse.queries.paths.path_event_query import PathEventQuery
from ee.clickhouse.sql.paths.path import PATH_ARRAY_QUERY
from posthog.constants import FUNNEL_PATH_BETWEEN_STEPS, LIMIT
from posthog.constants import FUNNEL_PATH_BETWEEN_STEPS, LIMIT, PATH_EDGE_LIMIT
from posthog.models import Filter, Team
from posthog.models.filters.path_filter import PathFilter

Expand Down Expand Up @@ -38,6 +39,14 @@ def __init__(self, filter: PathFilter, team: Team, funnel_filter: Optional[Filte
if not self._filter.limit:
self._filter = self._filter.with_data({LIMIT: 100})

if self._filter.edge_limit is None and not (self._filter.start_point and self._filter.end_point):
# no edge restriction when both start and end points are defined
self._filter = self._filter.with_data({PATH_EDGE_LIMIT: 100})

if self._filter.max_edge_weight and self._filter.min_edge_weight:
if self._filter.max_edge_weight < self._filter.min_edge_weight:
raise ValidationError("Max Edge weight can't be lower than min edge weight")

if self._filter.path_groupings:
regex_groupings = []
for grouping in self._filter.path_groupings:
Expand All @@ -47,11 +56,13 @@ def __init__(self, filter: PathFilter, team: Team, funnel_filter: Optional[Filte
regex_groupings.append(regex_grouping)
self.params["regex_groupings"] = regex_groupings

# TODO: don't allow including $pageview and excluding $pageview at the same time

def run(self, *args, **kwargs):

results = self._exec_query()

if not self._filter.min_edge_weight and not self._filter.max_edge_weight:
results = self.validate_results(results)

return self._format_results(results)

def _format_results(self, results):
Expand Down Expand Up @@ -100,6 +111,11 @@ def get_path_query(self) -> str:

paths_per_person_query = self.get_paths_per_person_query()

self.params["edge_limit"] = self._filter.edge_limit

edge_weight_filter, edge_weight_params = self.get_edge_weight_clause()
self.params.update(edge_weight_params)

return f"""
SELECT last_path_key as source_event,
path_key as target_event,
Expand All @@ -109,10 +125,11 @@ def get_path_query(self) -> str:
WHERE source_event IS NOT NULL
GROUP BY source_event,
target_event
{edge_weight_filter}
ORDER BY event_count DESC,
source_event,
target_event
LIMIT 30
{'LIMIT %(edge_limit)s' if self._filter.edge_limit else ''}
"""

def get_path_query_funnel_cte(self, funnel_filter: Filter):
Expand Down Expand Up @@ -141,6 +158,24 @@ def get_target_point_filter(self) -> str:
else:
return ""

def get_edge_weight_clause(self) -> Tuple[str, Dict]:
params: Dict[str, int] = {}

conditions = []

if self._filter.min_edge_weight:
params["min_edge_weight"] = self._filter.min_edge_weight
conditions.append("event_count >= %(min_edge_weight)s")

if self._filter.max_edge_weight:
params["max_edge_weight"] = self._filter.max_edge_weight
conditions.append("event_count <= %(max_edge_weight)s")

if conditions:
return f"HAVING {' AND '.join(conditions)}", params

return "", params

def get_target_clause(self) -> Tuple[str, Dict]:
params: Dict[str, Union[str, None]] = {"target_point": None, "secondary_target_point": None}

Expand Down Expand Up @@ -192,3 +227,33 @@ def get_filtered_path_ordering(self) -> Tuple[str, str]:
"arraySlice(filtered_path, 1, %(event_in_session_limit)s)",
"arraySlice(filtered_timings, 1, %(event_in_session_limit)s)",
)

def validate_results(self, results):
# Query guarantees results list to be:
# 1. Directed, Acyclic Tree where each node has only 1 child
# 2. All start nodes beginning with 1_

seen = set() # source nodes that've been traversed
edges = defaultdict(list)
validated_results = []
starting_nodes_stack = []

for result in results:
edges[result[0]].append(result[1])
if result[0].startswith("1_"):
# All nodes with 1_ are valid starting nodes
starting_nodes_stack.append(result[0])

while starting_nodes_stack:
current_node = starting_nodes_stack.pop()
seen.add(current_node)

for node in edges[current_node]:
if node not in seen:
starting_nodes_stack.append(node)

for result in results:
if result[0] in seen:
validated_results.append(result)

return validated_results
4 changes: 4 additions & 0 deletions ee/clickhouse/queries/paths/paths_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ def _format_results(self, results):
from posthog.api.person import PersonSerializer

return PersonSerializer(people, many=True).data, len(results) > cast(int, self._filter.limit) - 1

def run(self, *args, **kwargs):
results = self._exec_query()
return self._format_results(results)
Loading