Skip to content

Commit

Permalink
Type annotations in results exporter (#2035)
Browse files Browse the repository at this point in the history
  • Loading branch information
fidoriel authored Jul 30, 2024
1 parent a9e4171 commit 20e6f4a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 52 deletions.
129 changes: 81 additions & 48 deletions evap/results/exporters.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import warnings
from collections import OrderedDict, defaultdict
from collections.abc import Iterable, Sequence
from itertools import chain, repeat
from typing import Any, TypeVar

import xlwt
from django.db.models import Q
from django.db.models import Q, QuerySet
from django.db.models.base import Model
from django.utils.translation import gettext as _

from evap.evaluation.models import CourseType, Degree, Evaluation, Questionnaire
from evap.evaluation.models import CourseType, Degree, Evaluation, Question, Questionnaire, Semester, UserProfile
from evap.evaluation.tools import ExcelExporter
from evap.results.tools import (
AnsweredRatingResult,
QuestionResult,
RatingResult,
calculate_average_course_distribution,
calculate_average_distribution,
Expand All @@ -17,6 +22,10 @@
get_results,
)

T = TypeVar("T", bound=Model)
QuerySetOrSequence = QuerySet[T] | Sequence[T]
AnnotatedEvaluation = Any


class ResultsExporter(ExcelExporter):
CUSTOM_COLOR_START = 8
Expand All @@ -38,24 +47,24 @@ class ResultsExporter(ExcelExporter):
**ExcelExporter.styles,
}

def __init__(self):
def __init__(self) -> None:
super().__init__()

for index, color in self.COLOR_MAPPINGS.items():
self.workbook.set_colour_RGB(index, *color)

@classmethod
def grade_to_style(cls, grade):
def grade_to_style(cls, grade: float) -> str:
return "grade_" + str(cls.normalize_number(grade))

@classmethod
def normalize_number(cls, number):
def normalize_number(cls, number: float) -> float:
"""floors 'number' to a multiply of cls.STEP"""
rounded_number = round(number, 1) # see #302
return round(int(rounded_number / cls.STEP + 0.0001) * cls.STEP, 1)

@classmethod
def init_grade_styles(cls):
def init_grade_styles(cls) -> None:
"""
Adds the grade styles to cls.styles and as a xlwt identifier.
This also notes all registered colors in cls.COLOR_MAPPINGS for the instances.
Expand Down Expand Up @@ -88,7 +97,7 @@ def init_grade_styles(cls):
cls.styles[style_name] = xlwt.easyxf(grade_base_style.format(color_name), num_format_str="0.0")

@staticmethod
def filter_text_and_heading_questions(questions):
def filter_text_and_heading_questions(questions: Iterable[Question]) -> list[Question]:
questions = [question for question in questions if not question.is_text_question]

# remove heading questions if they have no "content" below them
Expand All @@ -103,16 +112,23 @@ def filter_text_and_heading_questions(questions):
return filtered_questions

@staticmethod
def filter_evaluations(semesters, evaluation_states, degrees, course_types, contributor, include_not_enough_voters):
def filter_evaluations(
semesters: Iterable[Semester],
evaluation_states: Iterable[Evaluation.State],
degree_ids: Iterable[int],
course_type_ids: Iterable[int],
contributor: UserProfile | None,
include_not_enough_voters: bool,
) -> tuple[list[tuple[Evaluation, OrderedDict[int, list[QuestionResult]]]], list[Questionnaire], bool]:
# pylint: disable=too-many-locals
course_results_exist = False
evaluations_with_results = []
used_questionnaires = set()
used_questionnaires: set[Questionnaire] = set()
evaluations_filter = Q(
course__semester__in=semesters,
state__in=evaluation_states,
course__degrees__in=degrees,
course__type__in=course_types,
course__degrees__in=degree_ids,
course__type__in=course_type_ids,
)
if contributor:
evaluations_filter = evaluations_filter & (
Expand All @@ -124,44 +140,49 @@ def filter_evaluations(semesters, evaluation_states, degrees, course_types, cont
continue
if not evaluation.can_publish_rating_results and not include_not_enough_voters:
continue
results = OrderedDict()
results: OrderedDict[int, list[QuestionResult]] = OrderedDict()
for contribution_result in get_results(evaluation).contribution_results:
for questionnaire_result in contribution_result.questionnaire_results:
# RatingQuestion.counts is a tuple of integers or None, if this tuple is all zero, we want to exclude it
if all(
not question_result.question.is_rating_question or not RatingResult.has_answers(question_result)
for question_result in questionnaire_result.question_results
question_results = questionnaire_result.question_results
if not any(
isinstance(question_result, AnsweredRatingResult) for question_result in question_results
):
continue
if (
not contributor
or contribution_result.contributor is None
or contribution_result.contributor == contributor
):
results.setdefault(questionnaire_result.questionnaire.id, []).extend(
questionnaire_result.question_results
)
results.setdefault(questionnaire_result.questionnaire.id, []).extend(question_results)
used_questionnaires.add(questionnaire_result.questionnaire)
evaluation.course_evaluations_count = evaluation.course.evaluations.count()
if evaluation.course_evaluations_count > 1:
annotated_evaluation: AnnotatedEvaluation = evaluation
annotated_evaluation.course_evaluations_count = annotated_evaluation.course.evaluations.count()
if annotated_evaluation.course_evaluations_count > 1:
course_results_exist = True
weight_sum = sum(evaluation.weight for evaluation in evaluation.course.evaluations.all())
evaluation.weight_percentage = int((evaluation.weight / weight_sum) * 100)
evaluation.course.avg_grade = distribution_to_grade(
calculate_average_course_distribution(evaluation.course)
weight_sum = sum(evaluation.weight for evaluation in annotated_evaluation.course.evaluations.all())
annotated_evaluation.weight_percentage = int((evaluation.weight / weight_sum) * 100)
annotated_evaluation.course.avg_grade = distribution_to_grade(
calculate_average_course_distribution(annotated_evaluation.course)
)
evaluations_with_results.append((evaluation, results))
evaluations_with_results.append((annotated_evaluation, results))

evaluations_with_results.sort(
key=lambda cr: (cr[0].course.semester.id, cr[0].course.type.order, cr[0].full_name)
)
used_questionnaires = sorted(used_questionnaires)
sorted_questionnaires = sorted(used_questionnaires)

return evaluations_with_results, used_questionnaires, course_results_exist
return evaluations_with_results, sorted_questionnaires, course_results_exist

def write_headings_and_evaluation_info(
self, evaluations_with_results, semesters, contributor, degrees, course_types, verbose_heading
):
self,
evaluations_with_results: list[tuple[Evaluation, OrderedDict[int, list[QuestionResult]]]],
semesters: QuerySetOrSequence[Semester],
contributor: UserProfile | None,
degrees: Iterable[int],
course_types: Iterable[int],
verbose_heading: bool,
) -> None:
export_name = _("Evaluation")
if contributor:
export_name += f"\n{contributor.full_name}"
Expand Down Expand Up @@ -199,27 +220,32 @@ def write_headings_and_evaluation_info(
# One more cell is needed for the question column
self.write_empty_row_with_styles(["default"] + ["border_left_right"] * len(evaluations_with_results))

def write_overall_results(self, evaluations_with_results, course_results_exist):
evaluations = [e for e, __ in evaluations_with_results]
def write_overall_results(
self,
evaluations_with_results: list[tuple[AnnotatedEvaluation, OrderedDict[int, list[QuestionResult]]]],
course_results_exist: bool,
) -> None:
annotated_evaluations = [e for e, __ in evaluations_with_results]

self.write_cell(_("Overall Average Grade"), "bold")
averages = (distribution_to_grade(calculate_average_distribution(e)) for e in evaluations)
averages = (distribution_to_grade(calculate_average_distribution(e)) for e in annotated_evaluations)
self.write_row(averages, lambda avg: self.grade_to_style(avg) if avg else "border_left_right")

self.write_cell(_("Total voters/Total participants"), "bold")
voter_ratios = (f"{e.num_voters}/{e.num_participants}" for e in evaluations)
voter_ratios = (f"{e.num_voters}/{e.num_participants}" for e in annotated_evaluations)
self.write_row(voter_ratios, style="total_voters")

self.write_cell(_("Evaluation rate"), "bold")
# round down like in progress bar
participant_percentages = (
f"{int((e.num_voters / e.num_participants) * 100) if e.num_participants > 0 else 0}%" for e in evaluations
f"{int((e.num_voters / e.num_participants) * 100) if e.num_participants > 0 else 0}%"
for e in annotated_evaluations
)
self.write_row(participant_percentages, style="evaluation_rate")

if course_results_exist:
# Only query the number of evaluations once and keep track of it here.
count_gt_1 = [e.course_evaluations_count > 1 for e in evaluations]
count_gt_1: list[bool] = [e.course_evaluations_count > 1 for e in annotated_evaluations]

# Borders only if there is a course grade below. Offset by one column
self.write_empty_row_with_styles(
Expand All @@ -228,12 +254,13 @@ def write_overall_results(self, evaluations_with_results, course_results_exist):

self.write_cell(_("Evaluation weight"), "bold")
weight_percentages = (
f"{e.weight_percentage}%" if gt1 else None for e, gt1 in zip(evaluations, count_gt_1, strict=True)
f"{e.weight_percentage}%" if gt1 else None
for e, gt1 in zip(annotated_evaluations, count_gt_1, strict=True)
)
self.write_row(weight_percentages, lambda s: "evaluation_weight" if s is not None else "default")

self.write_cell(_("Course Grade"), "bold")
for evaluation, gt1 in zip(evaluations, count_gt_1, strict=True):
for evaluation, gt1 in zip(annotated_evaluations, count_gt_1, strict=True):
if not gt1:
self.write_cell()
continue
Expand All @@ -246,7 +273,12 @@ def write_overall_results(self, evaluations_with_results, course_results_exist):
# Same reasoning as above.
self.write_empty_row_with_styles(["default"] + ["border_top" if gt1 else "default" for gt1 in count_gt_1])

def write_questionnaire(self, questionnaire, evaluations_with_results, contributor):
def write_questionnaire(
self,
questionnaire: Questionnaire,
evaluations_with_results: list[tuple[Evaluation, OrderedDict[int, list[QuestionResult]]]],
contributor: UserProfile | None,
) -> None:
if contributor and questionnaire.type == Questionnaire.Type.CONTRIBUTOR:
self.write_cell(f"{questionnaire.public_name} ({contributor.full_name})", "bold")
else:
Expand All @@ -270,6 +302,7 @@ def write_questionnaire(self, questionnaire, evaluations_with_results, contribut
for grade_result in results[questionnaire.id]:
if grade_result.question.id != question.id or not RatingResult.has_answers(grade_result):
continue

values.append(grade_result.average * grade_result.count_sum)
count_sum += grade_result.count_sum
if grade_result.question.is_yes_no_question:
Expand All @@ -292,17 +325,17 @@ def write_questionnaire(self, questionnaire, evaluations_with_results, contribut
# pylint: disable=arguments-differ
def export_impl(
self,
semesters,
selection_list,
include_not_enough_voters=False,
include_unpublished=False,
contributor=None,
verbose_heading=True,
semesters: QuerySetOrSequence[Semester],
selection_list: Sequence[tuple[Iterable[int], Iterable[int]]],
include_not_enough_voters: bool = False,
include_unpublished: bool = False,
contributor: UserProfile | None = None,
verbose_heading: bool = True,
):
# We want to throw early here, since workbook.save() will throw an IndexError otherwise.
assert len(selection_list) > 0

for sheet_counter, (degrees, course_types) in enumerate(selection_list, 1):
for sheet_counter, (degree_ids, course_type_ids) in enumerate(selection_list, 1):
self.cur_sheet = self.workbook.add_sheet("Sheet " + str(sheet_counter))
self.cur_row = 0
self.cur_col = 0
Expand All @@ -314,14 +347,14 @@ def export_impl(
evaluations_with_results, used_questionnaires, course_results_exist = self.filter_evaluations(
semesters,
evaluation_states,
degrees,
course_types,
degree_ids,
course_type_ids,
contributor,
include_not_enough_voters,
)

self.write_headings_and_evaluation_info(
evaluations_with_results, semesters, contributor, degrees, course_types, verbose_heading
evaluations_with_results, semesters, contributor, degree_ids, course_type_ids, verbose_heading
)

for questionnaire in used_questionnaires:
Expand Down
8 changes: 4 additions & 4 deletions evap/results/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def get_single_result_rating_result(evaluation):
return create_rating_result(question, answer_counters)


def get_results_cache_key(evaluation):
def get_results_cache_key(evaluation: Evaluation) -> str:
return f"evap.staff.results.tools.get_results-{evaluation.id:d}"


Expand All @@ -190,15 +190,15 @@ def cache_results(evaluation, *, refetch_related_objects=True):
caches["results"].set(cache_key, _get_results_impl(evaluation, refetch_related_objects=refetch_related_objects))


def get_results(evaluation):
def get_results(evaluation: Evaluation) -> EvaluationResult:
assert evaluation.state in STATES_WITH_RESULTS_CACHING | {Evaluation.State.IN_EVALUATION}

if evaluation.state == Evaluation.State.IN_EVALUATION:
return _get_results_impl(evaluation)

cache_key = get_results_cache_key(evaluation)
result = caches["results"].get(cache_key)
assert result is not None
assert isinstance(result, EvaluationResult)
return result


Expand All @@ -211,7 +211,7 @@ def get_results(evaluation):
]


def _get_results_impl(evaluation: Evaluation, *, refetch_related_objects: bool = True):
def _get_results_impl(evaluation: Evaluation, *, refetch_related_objects: bool = True) -> EvaluationResult:
if refetch_related_objects:
discard_cached_related_objects(evaluation)

Expand Down

0 comments on commit 20e6f4a

Please sign in to comment.