diff --git a/api/experimentation/constants.py b/api/experimentation/constants.py index 141e773a4751..fa34d7a7ff97 100644 --- a/api/experimentation/constants.py +++ b/api/experimentation/constants.py @@ -10,6 +10,7 @@ EXPOSURE_HOURLY_BUCKET_MAX_WINDOW = timedelta(hours=72) EXPOSURES_REFRESH_MIN_INTERVAL = timedelta(minutes=5) +RESULTS_REFRESH_MIN_INTERVAL = timedelta(minutes=5) CONTROL_VARIANT_KEY = "control" diff --git a/api/experimentation/migrations/0008_experiment_results.py b/api/experimentation/migrations/0008_experiment_results.py new file mode 100644 index 000000000000..c26943bad594 --- /dev/null +++ b/api/experimentation/migrations/0008_experiment_results.py @@ -0,0 +1,40 @@ +# Generated by Django 5.2.14 on 2026-06-16 09:25 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("experimentation", "0007_exposures_refresh_requested_at"), + ] + + operations = [ + migrations.CreateModel( + name="ExperimentResults", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("as_of", models.DateTimeField(blank=True, null=True)), + ("payload", models.JSONField(blank=True, null=True)), + ("last_error_at", models.DateTimeField(blank=True, null=True)), + ("refresh_requested_at", models.DateTimeField(blank=True, null=True)), + ( + "experiment", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="results", + to="experimentation.experiment", + ), + ), + ], + ), + ] diff --git a/api/experimentation/models.py b/api/experimentation/models.py index 5fe779007fe8..fbc73cfbd562 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -1,6 +1,6 @@ -import typing from dataclasses import asdict from datetime import datetime +from typing import TYPE_CHECKING, Generic, TypeVar from django.db import models from django.db.models import Q @@ -14,10 +14,16 @@ from core.models import SoftDeleteExportableModel from environments.models import Environment +from experimentation.dataclasses import ( + ExposuresSummary, + ResultsSummary, + WarehouseEventStats, +) from experimentation.types import MetricDefinition -if typing.TYPE_CHECKING: - from experimentation.dataclasses import ExposuresSummary, WarehouseEventStats +# A computation's payload is the serialised form of its summary dataclass; the +# concrete subclass binds which one, so record_refresh stays type-safe per panel. +SummaryT = TypeVar("SummaryT", ExposuresSummary, ResultsSummary) class WarehouseType(models.TextChoices): @@ -132,12 +138,11 @@ class Meta: ] -class ExperimentExposures(models.Model): - experiment = models.OneToOneField( - Experiment, - on_delete=models.CASCADE, - related_name="exposures", - ) +class ExperimentComputation(models.Model, Generic[SummaryT]): + """One cached, refreshable warehouse computation per experiment: a single row + updated in place, frozen once ``is_final``. A failed refresh preserves the + last good payload so the UI keeps showing real data with a staleness note.""" + as_of = models.DateTimeField(null=True, blank=True) payload: models.JSONField[dict[str, object] | None, dict[str, object] | None] = ( models.JSONField(null=True, blank=True) @@ -145,6 +150,14 @@ class ExperimentExposures(models.Model): last_error_at = models.DateTimeField(null=True, blank=True) refresh_requested_at = models.DateTimeField(null=True, blank=True) + if TYPE_CHECKING: + # Each concrete subclass defines this as a OneToOneField; declared here + # so is_final can read the experiment without the field assignment. + experiment: "models.OneToOneField[Experiment, Experiment]" + + class Meta: + abstract = True + @property def is_final(self) -> bool: ended_at = self.experiment.ended_at @@ -152,7 +165,7 @@ def is_final(self) -> bool: ended_at is not None and self.as_of is not None and self.as_of >= ended_at ) - def record_refresh(self, summary: "ExposuresSummary", as_of: datetime) -> None: + def record_refresh(self, summary: SummaryT, as_of: datetime) -> None: self.payload = asdict(summary) self.as_of = as_of self.last_error_at = None @@ -167,6 +180,22 @@ def record_refresh_request(self) -> None: self.save(update_fields=["refresh_requested_at"]) +class ExperimentExposures(ExperimentComputation[ExposuresSummary]): + experiment = models.OneToOneField( + Experiment, + on_delete=models.CASCADE, + related_name="exposures", + ) + + +class ExperimentResults(ExperimentComputation[ResultsSummary]): + experiment = models.OneToOneField( + Experiment, + on_delete=models.CASCADE, + related_name="results", + ) + + class MetricAggregation(models.TextChoices): COUNT = "count", "Count" SUM = "sum", "Sum" diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index 9ca5449fe7f1..530d2970c248 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -12,6 +12,7 @@ Experiment, ExperimentExposures, ExperimentMetric, + ExperimentResults, ExperimentStatus, Metric, WarehouseConnection, @@ -340,6 +341,28 @@ class ExperimentListSerializer(ExperimentSerializer): class ExperimentExposuresSerializer(serializers.ModelSerializer): # type: ignore[type-arg] + is_final = serializers.BooleanField(read_only=True) + class Meta: model = ExperimentExposures - fields = ("as_of", "last_error_at", "refresh_requested_at", "payload") + fields = ( + "as_of", + "last_error_at", + "refresh_requested_at", + "payload", + "is_final", + ) + + +class ExperimentResultsSerializer(serializers.ModelSerializer): # type: ignore[type-arg] + is_final = serializers.BooleanField(read_only=True) + + class Meta: + model = ExperimentResults + fields = ( + "as_of", + "last_error_at", + "refresh_requested_at", + "payload", + "is_final", + ) diff --git a/api/experimentation/services.py b/api/experimentation/services.py index 7b5ccb21c60c..0a56fa27bc18 100644 --- a/api/experimentation/services.py +++ b/api/experimentation/services.py @@ -8,6 +8,7 @@ from clickhouse_driver import Client from clickhouse_driver.util.helpers import parse_url from django.conf import settings +from django.db.models import Q from django.utils import timezone from audit.models import AuditLog @@ -37,6 +38,7 @@ VALID_STATUS_TRANSITIONS, ExperimentStatus, MetricAggregation, + MetricDirection, WarehouseConnectionStatus, WarehouseType, ) @@ -47,6 +49,7 @@ compare_to_control, srm_p_value, ) +from features.models import FeatureState from integrations.flagsmith.client import get_openfeature_client if typing.TYPE_CHECKING: @@ -294,6 +297,98 @@ def build_results_summary( ) +def compute_results_summary( + experiment: "Experiment", + *, + window_start: "datetime", + window_end: "datetime", +) -> ResultsSummary: + """Gather an experiment's metric statistics from the warehouse and reduce + them to the stored results payload.""" + specs = _experiment_metric_specs(experiment) + aggregates = get_metric_variant_stats( + environment_key=experiment.environment.api_key, + feature_name=experiment.feature.name, + window_start=window_start, + window_end=window_end, + specs=specs, + ) + return build_results_summary( + aggregates, + expected_shares=_expected_variant_shares(experiment), + ) + + +def _experiment_metric_specs(experiment: "Experiment") -> list[MetricSpec]: + return [ + MetricSpec( + metric_id=experiment_metric.metric_id, + event=experiment_metric.metric.definition["event"], + aggregation=experiment_metric.metric.aggregation, + lower_is_better=( + experiment_metric.metric.direction == MetricDirection.DOWN + ), + ) + for experiment_metric in experiment.experiment_metrics.select_related("metric") + ] + + +def _expected_variant_shares(experiment: "Experiment") -> dict[str, float]: + """The traffic split SRM tests against: each multivariate option's + environment allocation, with ``control`` taking the unallocated remainder. + Empty when the feature has no usable allocations, skipping the SRM check.""" + # TODO: read the split from the percentage-split segment override feature + # state once that's implemented, rather than the environment default. + feature_state = ( + FeatureState.objects.get_live_feature_states( + environment=experiment.environment, + additional_filters=Q(feature_segment__isnull=True, identity__isnull=True), + feature_id=experiment.feature_id, + ) + .prefetch_related( + "multivariate_feature_state_values__multivariate_feature_option" + ) + # Highest id is the current version, matching how Environment selects + # active feature states (Max("id")); the default ordering is ascending. + .order_by("-id") + .first() + ) + if feature_state is None: + return {} + + shares: dict[str, float] = {} + allocated = 0.0 + for mv_value in feature_state.multivariate_feature_state_values.all(): + key = mv_value.multivariate_feature_option.key + if key is None: + # An unkeyed option's traffic can't be attributed to a variant; + # counting it as control would inflate control's expected share and + # raise a false SRM alarm, so skip the check entirely. + logger.error( + "srm.unkeyed_variant", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + feature__id=experiment.feature_id, + ) + return {} + shares[key] = mv_value.percentage_allocation / 100 + allocated += mv_value.percentage_allocation + if not shares: + return {} + if allocated > 100: + # A misconfigured feature whose options over-allocate; control's share + # would be negative, so there's no valid split to test against. + logger.error( + "srm.overallocated", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + feature__id=experiment.feature_id, + ) + return {} + shares[CONTROL_VARIANT_KEY] = (100 - allocated) / 100 + return shares + + def _metric_inference( spec: MetricSpec, variants: dict[str, VariantStats], diff --git a/api/experimentation/tasks.py b/api/experimentation/tasks.py index 302ef3053aff..09d90a355a45 100644 --- a/api/experimentation/tasks.py +++ b/api/experimentation/tasks.py @@ -3,8 +3,8 @@ from task_processor.decorators import register_task_handler from experimentation import ingestion_sync_service -from experimentation.models import Experiment, ExperimentExposures -from experimentation.services import compute_exposures_summary +from experimentation.models import Experiment, ExperimentExposures, ExperimentResults +from experimentation.services import compute_exposures_summary, compute_results_summary logger = structlog.get_logger("experimentation") @@ -53,3 +53,38 @@ def compute_experiment_exposures(experiment_id: int) -> None: return exposures.record_refresh(summary, as_of) + + +@register_task_handler() +def compute_experiment_results(experiment_id: int) -> None: + experiment = ( + Experiment.objects.select_related("environment__project", "feature") + .filter(id=experiment_id) + .first() + ) + if experiment is None or not experiment.started_at: + return + + results, _ = ExperimentResults.objects.get_or_create(experiment=experiment) + if results.is_final: + return + + as_of = experiment.ended_at or timezone.now() + try: + summary = compute_results_summary( + experiment, + window_start=experiment.started_at, + window_end=as_of, + ) + except Exception as exc: + results.record_failure() + logger.error( + "results.compute_failed", + exc_info=exc, + experiment__id=experiment.id, + environment__id=experiment.environment_id, + organisation__id=experiment.environment.project.organisation_id, + ) + return + + results.record_refresh(summary, as_of) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index bbf55f87e72c..8310289f4c75 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -1,5 +1,5 @@ import logging -import math +from datetime import timedelta from typing import Any from django.db import IntegrityError @@ -8,6 +8,7 @@ from django.utils import timezone from rest_framework import mixins, serializers, status from rest_framework.decorators import action +from rest_framework.exceptions import Throttled, ValidationError from rest_framework.permissions import IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response @@ -16,11 +17,16 @@ from app.pagination import CustomPagination from environments.views import NestedEnvironmentViewSet -from experimentation.constants import EXPOSURES_REFRESH_MIN_INTERVAL +from experimentation.constants import ( + EXPOSURES_REFRESH_MIN_INTERVAL, + RESULTS_REFRESH_MIN_INTERVAL, +) from experimentation.models import ( Experiment, + ExperimentComputation, ExperimentExposures, ExperimentMetric, + ExperimentResults, ExperimentStatus, Metric, WarehouseConnection, @@ -35,6 +41,7 @@ ExperimentExposuresSerializer, ExperimentListSerializer, ExperimentMetricSerializer, + ExperimentResultsSerializer, ExperimentSerializer, MetricSerializer, WarehouseConnectionSerializer, @@ -48,7 +55,10 @@ refresh_warehouse_connection_status, transition_experiment_status, ) -from experimentation.tasks import compute_experiment_exposures +from experimentation.tasks import ( + compute_experiment_exposures, + compute_experiment_results, +) from users.models import FFAdminUser logger = logging.getLogger(__name__) @@ -295,29 +305,14 @@ def exposures(self, request: Request, **kwargs: object) -> Response: @action(detail=True, methods=["post"], url_path="exposures/refresh") def refresh_exposures(self, request: Request, **kwargs: object) -> Response: experiment: Experiment = self.get_object() - if experiment.started_at is None: - return Response( - {"detail": "Cannot refresh exposures before the experiment starts."}, - status=status.HTTP_400_BAD_REQUEST, - ) exposures = ExperimentExposures.objects.filter(experiment=experiment).first() - if exposures is not None and exposures.is_final: - return Response( - {"detail": "Exposures are final for this completed experiment."}, - status=status.HTTP_400_BAD_REQUEST, - ) - if exposures is not None and exposures.refresh_requested_at is not None: - retry_after = EXPOSURES_REFRESH_MIN_INTERVAL - ( - timezone.now() - exposures.refresh_requested_at - ) - if retry_after.total_seconds() > 0: - return Response( - {"detail": "A refresh was requested recently. Try again later."}, - status=status.HTTP_429_TOO_MANY_REQUESTS, - headers={ - "Retry-After": str(math.ceil(retry_after.total_seconds())) - }, - ) + self._validate_refresh_request( + experiment, + exposures, + min_interval=EXPOSURES_REFRESH_MIN_INTERVAL, + before_start_detail="Cannot refresh exposures before the experiment starts.", + final_detail="Exposures are final for this completed experiment.", + ) if exposures is None: exposures, _ = ExperimentExposures.objects.get_or_create( experiment=experiment @@ -326,6 +321,58 @@ def refresh_exposures(self, request: Request, **kwargs: object) -> Response: compute_experiment_exposures.delay(kwargs={"experiment_id": experiment.id}) return Response(status=status.HTTP_202_ACCEPTED) + @action(detail=True, methods=["get"]) + def results(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + results = getattr(experiment, "results", None) + return Response( + { + "results": ( + ExperimentResultsSerializer(results).data if results else None + ), + } + ) + + @action(detail=True, methods=["post"], url_path="results/refresh") + def refresh_results(self, request: Request, **kwargs: object) -> Response: + experiment: Experiment = self.get_object() + results = ExperimentResults.objects.filter(experiment=experiment).first() + self._validate_refresh_request( + experiment, + results, + min_interval=RESULTS_REFRESH_MIN_INTERVAL, + before_start_detail="Cannot refresh results before the experiment starts.", + final_detail="Results are final for this completed experiment.", + ) + if results is None: + results, _ = ExperimentResults.objects.get_or_create(experiment=experiment) + results.record_refresh_request() + compute_experiment_results.delay(kwargs={"experiment_id": experiment.id}) + return Response(status=status.HTTP_202_ACCEPTED) + + def _validate_refresh_request( + self, + experiment: Experiment, + computation: "ExperimentComputation[Any] | None", + *, + min_interval: timedelta, + before_start_detail: str, + final_detail: str, + ) -> None: + if experiment.started_at is None: + raise ValidationError({"detail": before_start_detail}) + if computation is not None and computation.is_final: + raise ValidationError({"detail": final_detail}) + if computation is not None and computation.refresh_requested_at is not None: + retry_after = min_interval - ( + timezone.now() - computation.refresh_requested_at + ) + if retry_after.total_seconds() > 0: + raise Throttled( + wait=retry_after.total_seconds(), + detail="A refresh was requested recently. Try again later.", + ) + def _transition_status(self, target_status: str) -> Response: experiment: Experiment = self.get_object() try: diff --git a/api/tests/unit/experimentation/test_experiment_views.py b/api/tests/unit/experimentation/test_experiment_views.py index f2aed13d94f3..5015500b0d1a 100644 --- a/api/tests/unit/experimentation/test_experiment_views.py +++ b/api/tests/unit/experimentation/test_experiment_views.py @@ -19,12 +19,14 @@ from experimentation.constants import ( EXPERIMENT_FLAG, EXPOSURES_REFRESH_MIN_INTERVAL, + RESULTS_REFRESH_MIN_INTERVAL, ) from experimentation.models import ( ExpectedDirection, Experiment, ExperimentExposures, ExperimentMetric, + ExperimentResults, ExperimentStatus, Metric, ) @@ -702,6 +704,7 @@ def test_exposures__computed_row__returns_row( "last_error_at": None, "refresh_requested_at": None, "payload": payload, + "is_final": False, } } @@ -753,6 +756,7 @@ def test_exposures__failed_refresh__returns_error_marker_with_last_payload( "last_error_at": "2026-06-11T12:00:00Z", "refresh_requested_at": None, "payload": payload, + "is_final": False, } } @@ -1377,6 +1381,312 @@ def test_post__concurrent_create_race__returns_409( assert response.status_code == status.HTTP_409_CONFLICT +def test_results__computed_row__returns_row( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a previously computed results row + enable_features(EXPERIMENT_FLAG) + payload = { + "srm_p_value": 0.42, + "metrics": [ + { + "metric_id": 7, + "variants": { + "control": {"n": 1000, "sum": 100.0, "sum_squares": 100.0} + }, + "inference": {}, + } + ], + } + ExperimentResults.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + payload=payload, + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "results")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "results": { + "as_of": "2026-06-11T12:00:00Z", + "last_error_at": None, + "refresh_requested_at": None, + "payload": payload, + "is_final": False, + } + } + + +def test_results__never_computed__returns_null( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "results")) + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json() == {"results": None} + + +def test_results__failed_refresh__returns_error_marker_with_last_payload( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a row whose last refresh failed after an earlier success + enable_features(EXPERIMENT_FLAG) + payload: dict[str, object] = {"srm_p_value": None, "metrics": []} + ExperimentResults.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 11, 11, tzinfo=dt_timezone.utc), + payload=payload, + last_error_at=datetime(2026, 6, 11, 12, tzinfo=dt_timezone.utc), + ) + + # When + response = admin_client_new.get(_action_url(environment, experiment, "results")) + + # Then the stale data and the error marker are both surfaced + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "results": { + "as_of": "2026-06-11T11:00:00Z", + "last_error_at": "2026-06-11T12:00:00Z", + "refresh_requested_at": None, + "payload": payload, + "is_final": False, + } + } + + +def test_results__admin_without_flag__returns_403( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, +) -> None: + # Given — feature flag not enabled + + # When + response = admin_client_new.get(_action_url(environment, experiment, "results")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_results__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.get(_action_url(environment, experiment, "results")) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_refresh_results__started_experiment__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once_with( + kwargs={"experiment_id": experiment.id}, + ) + results = ExperimentResults.objects.get(experiment=experiment) + assert results.refresh_requested_at is not None + + +@freeze_time("2026-06-11T12:00:00Z") +def test_refresh_results__requested_recently__returns_429_with_retry_after( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a refresh was requested a minute ago + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentResults.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now() - timedelta(minutes=1), + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then the client is told when to retry + assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS + assert response.headers["Retry-After"] == "240" + mock_compute.delay.assert_not_called() + + +def test_refresh_results__last_request_beyond_interval__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given the last refresh request is older than the minimum interval + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentResults.objects.create( + experiment=experiment, + refresh_requested_at=timezone.now() - RESULTS_REFRESH_MIN_INTERVAL, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() + + +def test_refresh_results__completed_with_final_row__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentResults.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload={"srm_p_value": None, "metrics": []}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then a final row is not recomputed + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_results__completed_with_stale_row__enqueues_compute( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a completed experiment last computed before it ended + enable_features(EXPERIMENT_FLAG) + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentResults.objects.create( + experiment=experiment, + as_of=datetime(2026, 6, 7, tzinfo=dt_timezone.utc), + payload={"srm_p_value": None, "metrics": []}, + ) + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then the finalising refresh is allowed + assert response.status_code == status.HTTP_202_ACCEPTED + mock_compute.delay.assert_called_once() + + +def test_refresh_results__not_started_experiment__returns_400( + admin_client_new: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + enable_features(EXPERIMENT_FLAG) + mock_compute = mocker.patch("experimentation.views.compute_experiment_results") + + # When + response = admin_client_new.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then + assert response.status_code == status.HTTP_400_BAD_REQUEST + mock_compute.delay.assert_not_called() + + +def test_refresh_results__staff_user_with_flag__returns_403( + staff_client: APIClient, + environment: Environment, + experiment: Experiment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features(EXPERIMENT_FLAG) + + # When + response = staff_client.post( + _action_url(environment, experiment, "refresh-results") + ) + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_get_detail__env_level_allocations__returns_environment_percentages( admin_client_new: APIClient, environment: Environment, diff --git a/api/tests/unit/experimentation/test_models.py b/api/tests/unit/experimentation/test_models.py index fd3e6dab1a6e..268883841564 100644 --- a/api/tests/unit/experimentation/test_models.py +++ b/api/tests/unit/experimentation/test_models.py @@ -1,5 +1,8 @@ from dataclasses import asdict +from datetime import datetime +from datetime import timezone as dt_timezone +import pytest from django.utils import timezone from pytest_mock import MockerFixture @@ -8,13 +11,17 @@ ExposuresSummary, ExposuresTimeseries, ExposuresTimeseriesPoint, + MetricResult, + ResultsSummary, ) from experimentation.models import ( Experiment, ExperimentExposures, + ExperimentResults, WarehouseConnection, WarehouseType, ) +from experimentation.stats import VariantStats def test_warehouse_connection__after_create__enqueues_ingestion_add_task( @@ -123,3 +130,114 @@ def test_experiment_exposures__record_failure__preserves_last_good_payload( assert exposures.last_error_at is not None assert exposures.payload == asdict(_summary()) assert exposures.as_of == as_of + + +def _results_summary() -> ResultsSummary: + return ResultsSummary( + srm_p_value=0.42, + metrics=[ + MetricResult( + metric_id=7, + variants={ + "control": VariantStats(n=1000, sum=100.0, sum_squares=100.0) + }, + inference={}, + ) + ], + ) + + +def test_experiment_results__record_refresh__stores_payload_and_clears_error( + experiment: Experiment, +) -> None: + # Given a row whose last refresh failed + results = ExperimentResults.objects.create( + experiment=experiment, + last_error_at=timezone.now(), + ) + as_of = timezone.now() + + # When + results.record_refresh(_results_summary(), as_of) + + # Then the summary is stored as plain JSON and the error marker is cleared + results.refresh_from_db() + assert results.payload == { + "srm_p_value": 0.42, + "metrics": [ + { + "metric_id": 7, + "variants": { + "control": {"n": 1000, "sum": 100.0, "sum_squares": 100.0} + }, + "inference": {}, + } + ], + } + assert results.as_of == as_of + assert results.last_error_at is None + + +def test_experiment_results__record_failure__preserves_last_good_payload( + experiment: Experiment, +) -> None: + # Given a row holding a previously computed payload + as_of = timezone.now() + results = ExperimentResults.objects.create( + experiment=experiment, + as_of=as_of, + payload=asdict(_results_summary()), + ) + + # When + results.record_failure() + + # Then only the error marker changes + results.refresh_from_db() + assert results.last_error_at is not None + assert results.payload == asdict(_results_summary()) + assert results.as_of == as_of + + +@pytest.mark.parametrize( + "ended_at, as_of, expected", + [ + pytest.param(None, None, False, id="running-uncomputed"), + pytest.param( + None, + datetime(2026, 6, 8, tzinfo=dt_timezone.utc), + False, + id="running-computed", + ), + pytest.param( + datetime(2026, 6, 8, tzinfo=dt_timezone.utc), + datetime(2026, 6, 7, tzinfo=dt_timezone.utc), + False, + id="completed-stale", + ), + pytest.param( + datetime(2026, 6, 8, tzinfo=dt_timezone.utc), + datetime(2026, 6, 8, tzinfo=dt_timezone.utc), + True, + id="completed-at-end", + ), + pytest.param( + datetime(2026, 6, 8, tzinfo=dt_timezone.utc), + datetime(2026, 6, 9, tzinfo=dt_timezone.utc), + True, + id="completed-past-end", + ), + ], +) +def test_experiment_results__is_final__reflects_window_coverage( + experiment: Experiment, + ended_at: datetime | None, + as_of: datetime | None, + expected: bool, +) -> None: + # Given an experiment with a given end and a row computed as of some time + experiment.ended_at = ended_at + + # When / Then the row is final only once it covers the experiment's end + results = ExperimentResults(experiment=experiment, as_of=as_of) + assert results.is_final is expected diff --git a/api/tests/unit/experimentation/test_services.py b/api/tests/unit/experimentation/test_services.py index eaaa89253cd2..aa138bc0c232 100644 --- a/api/tests/unit/experimentation/test_services.py +++ b/api/tests/unit/experimentation/test_services.py @@ -18,13 +18,23 @@ WarehouseEventStats, ) from experimentation.models import ( + ExpectedDirection, + Experiment, + ExperimentMetric, + ExperimentStatus, + Metric, MetricAggregation, + MetricDirection, WarehouseConnection, WarehouseConnectionStatus, WarehouseType, ) from experimentation.results_query import _MetricSlot from experimentation.stats import VariantStats +from features.feature_types import MULTIVARIATE +from features.models import Feature, FeatureState +from features.multivariate.models import MultivariateFeatureOption +from features.value_types import STRING def test_get_clickhouse_client__configured_url__builds_client_with_timeouts( @@ -1031,3 +1041,263 @@ def test_build_results_summary__computed__serialises_to_wire_shape() -> None: "ci_high", "chance_to_win", } + + +@pytest.mark.django_db +def test_experiment_metric_specs__attached_metrics__maps_definition_and_direction( + experiment: Experiment, + environment: Environment, +) -> None: + # Given two metrics attached to the experiment, one lower-is-better + higher = Metric.objects.create( + environment=environment, + name="Revenue", + aggregation=MetricAggregation.SUM, + direction=MetricDirection.UP, + definition={"version": 1, "event": "purchase"}, + ) + lower = Metric.objects.create( + environment=environment, + name="Errors", + aggregation=MetricAggregation.COUNT, + direction=MetricDirection.DOWN, + definition={"version": 1, "event": "error"}, + ) + ExperimentMetric.objects.create( + experiment=experiment, + metric=higher, + expected_direction=ExpectedDirection.INCREASE, + ) + ExperimentMetric.objects.create( + experiment=experiment, + metric=lower, + expected_direction=ExpectedDirection.DECREASE, + ) + + # When + specs = services._experiment_metric_specs(experiment) + + # Then each metric maps to its event, aggregation and polarity + assert specs == [ + MetricSpec( + metric_id=higher.id, + event="purchase", + aggregation=MetricAggregation.SUM, + lower_is_better=False, + ), + MetricSpec( + metric_id=lower.id, + event="error", + aggregation=MetricAggregation.COUNT, + lower_is_better=True, + ), + ] + + +def _multivariate_feature( + environment: Environment, + allocations: dict[str | None, int], +) -> Feature: + feature: Feature = Feature.objects.create( + name="results-feature", + project=environment.project, + type=MULTIVARIATE, + initial_value="control", + ) + for key, allocation in allocations.items(): + MultivariateFeatureOption.objects.create( + feature=feature, + key=key, + default_percentage_allocation=allocation, + type=STRING, + string_value=key or "unkeyed", + ) + return feature + + +@pytest.mark.django_db +def test_expected_variant_shares__keyed_options__control_takes_remainder( + environment: Environment, +) -> None: + # Given a multivariate feature whose options are allocated 30% and 20% + feature = _multivariate_feature(environment, {"variant_a": 30, "variant_b": 20}) + experiment = Experiment.objects.create( + environment=environment, + feature=feature, + name="exp", + hypothesis="h", + status=ExperimentStatus.RUNNING, + ) + + # When + shares = services._expected_variant_shares(experiment) + + # Then control takes the unallocated remainder + assert shares == pytest.approx({"variant_a": 0.3, "variant_b": 0.2, "control": 0.5}) + + +@pytest.mark.django_db +def test_expected_variant_shares__null_option_keys__returns_empty( + experiment: Experiment, +) -> None: + # Given the experiment's multivariate options carry no variant keys + + # When / Then the split can't be described, so SRM is skipped + assert services._expected_variant_shares(experiment) == {} + + +@pytest.mark.django_db +def test_expected_variant_shares__mixed_keyed_and_null_options__returns_empty( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given a multivariate feature with one keyed and one unkeyed option + feature = _multivariate_feature(environment, {"variant_a": 30, None: 30}) + experiment = Experiment.objects.create( + environment=environment, + feature=feature, + name="exp", + hypothesis="h", + status=ExperimentStatus.RUNNING, + ) + + # When / Then the unkeyed option's share can't be attributed, so rather than + # folding it into control SRM is skipped entirely and the gap is logged + assert services._expected_variant_shares(experiment) == {} + assert log.has( + "srm.unkeyed_variant", + level="error", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + feature__id=experiment.feature_id, + ) + + +@pytest.mark.django_db +def test_expected_variant_shares__overallocated_options__returns_empty( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given a misconfigured feature whose options allocate more than 100% + feature = _multivariate_feature(environment, {"variant_a": 70, "variant_b": 60}) + experiment = Experiment.objects.create( + environment=environment, + feature=feature, + name="exp", + hypothesis="h", + status=ExperimentStatus.RUNNING, + ) + + # When / Then control's share would be negative, so SRM is skipped and logged + assert services._expected_variant_shares(experiment) == {} + assert log.has( + "srm.overallocated", + level="error", + experiment__id=experiment.id, + environment__id=experiment.environment_id, + feature__id=experiment.feature_id, + ) + + +@pytest.mark.django_db +def test_expected_variant_shares__no_multivariate_options__returns_empty( + environment: Environment, + feature: Feature, +) -> None: + # Given a standard feature with no multivariate allocations + experiment = Experiment.objects.create( + environment=environment, + feature=feature, + name="exp", + hypothesis="h", + status=ExperimentStatus.RUNNING, + ) + + # When / Then there is no split to test + assert services._expected_variant_shares(experiment) == {} + + +@pytest.mark.django_db +def test_expected_variant_shares__no_live_feature_state__returns_empty( + experiment: Experiment, +) -> None: + # Given the feature has no live state in the environment + FeatureState.objects.filter( + feature=experiment.feature, + environment=experiment.environment, + ).delete() + + # When / Then + assert services._expected_variant_shares(experiment) == {} + + +@pytest.mark.django_db +def test_compute_results_summary__experiment__queries_warehouse_and_builds( + environment: Environment, + mocker: MockerFixture, +) -> None: + # Given a running experiment with one keyed variant and one attached metric + feature = _multivariate_feature(environment, {"variant_a": 50}) + experiment = Experiment.objects.create( + environment=environment, + feature=feature, + name="exp", + hypothesis="h", + status=ExperimentStatus.RUNNING, + ) + metric = Metric.objects.create( + environment=environment, + name="Purchases", + aggregation=MetricAggregation.OCCURRENCE, + direction=MetricDirection.UP, + definition={"version": 1, "event": "purchase"}, + ) + ExperimentMetric.objects.create( + experiment=experiment, + metric=metric, + expected_direction=ExpectedDirection.INCREASE, + ) + expected_specs = [ + _spec( + metric_id=metric.id, + event="purchase", + aggregation=MetricAggregation.OCCURRENCE, + ) + ] + aggregates = _aggregates( + specs=expected_specs, + exposure_counts={"control": 1000, "variant_a": 1000}, + metric_stats={ + metric.id: { + "control": VariantStats(n=1000, sum=100.0, sum_squares=100.0), + "variant_a": VariantStats(n=1000, sum=140.0, sum_squares=140.0), + } + }, + ) + mock_stats = mocker.patch( + "experimentation.services.get_metric_variant_stats", + return_value=aggregates, + ) + window_start = datetime(2026, 6, 1, tzinfo=timezone.utc) + window_end = datetime(2026, 6, 10, tzinfo=timezone.utc) + + # When + summary = services.compute_results_summary( + experiment, + window_start=window_start, + window_end=window_end, + ) + + # Then the warehouse is queried with the experiment's metric specs + mock_stats.assert_called_once_with( + environment_key=environment.api_key, + feature_name=feature.name, + window_start=window_start, + window_end=window_end, + specs=expected_specs, + ) + # And the summary carries the metric result with an SRM verdict from the + # configured 50/50 split + assert summary.srm_p_value == pytest.approx(1.0) + assert summary.metrics[0].metric_id == metric.id + assert summary.metrics[0].inference["variant_a"] is not None diff --git a/api/tests/unit/experimentation/test_tasks.py b/api/tests/unit/experimentation/test_tasks.py index a465c24bb4b1..00934f908b1e 100644 --- a/api/tests/unit/experimentation/test_tasks.py +++ b/api/tests/unit/experimentation/test_tasks.py @@ -11,15 +11,20 @@ ExposuresSummary, ExposuresTimeseries, ExposuresTimeseriesPoint, + MetricResult, + ResultsSummary, ) from experimentation.models import ( Experiment, ExperimentExposures, + ExperimentResults, ExperimentStatus, ) +from experimentation.stats import VariantStats from experimentation.tasks import ( add_environment_key_to_ingestion, compute_experiment_exposures, + compute_experiment_results, delete_environment_key_from_ingestion, ) @@ -225,3 +230,179 @@ def test_compute_experiment_exposures__experiment_deleted_after_enqueue__skips( # Then the task exits without raising into the task processor mock_compute.assert_not_called() + + +def _results_summary() -> ResultsSummary: + return ResultsSummary( + srm_p_value=0.42, + metrics=[ + MetricResult( + metric_id=7, + variants={ + "control": VariantStats(n=1000, sum=100.0, sum_squares=100.0) + }, + inference={}, + ) + ], + ) + + +@freeze_time("2026-06-11T12:00:00Z") +def test_compute_experiment_results__running_experiment__stores_summary( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a running experiment and a warehouse responding with a summary + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_results_summary", + return_value=_results_summary(), + ) + + # When + compute_experiment_results(experiment_id=experiment.id) + + # Then the full window up to now is computed and stored on the row + mock_compute.assert_called_once_with( + experiment, + window_start=experiment.started_at, + window_end=timezone.now(), + ) + results = ExperimentResults.objects.get(experiment=experiment) + assert results.payload == asdict(_results_summary()) + assert results.as_of == timezone.now() + assert results.last_error_at is None + + +def test_compute_experiment_results__completed_experiment__window_ends_at_ended_at( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + mock_compute = mocker.patch( + "experimentation.tasks.compute_results_summary", + return_value=_results_summary(), + ) + + # When + compute_experiment_results(experiment_id=experiment.id) + + # Then the window is frozen at the experiment's end + mock_compute.assert_called_once_with( + experiment, + window_start=experiment.started_at, + window_end=experiment.ended_at, + ) + results = ExperimentResults.objects.get(experiment=experiment) + assert results.as_of == experiment.ended_at + + +def test_compute_experiment_results__warehouse_error__records_failure( + experiment: Experiment, + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given a running experiment whose row holds a previously computed payload + experiment.status = ExperimentStatus.RUNNING + experiment.started_at = datetime(2026, 6, 10, tzinfo=dt_timezone.utc) + experiment.save() + as_of = timezone.now() + ExperimentResults.objects.create( + experiment=experiment, + as_of=as_of, + payload=asdict(_results_summary()), + ) + exc = Exception("warehouse unreachable") + mocker.patch( + "experimentation.tasks.compute_results_summary", + side_effect=exc, + ) + + # When + compute_experiment_results(experiment_id=experiment.id) + + # Then the failure is recorded and the last good payload survives + results = ExperimentResults.objects.get(experiment=experiment) + assert results.last_error_at is not None + assert results.payload == asdict(_results_summary()) + assert results.as_of == as_of + # And exactly one failure event is logged for operators, carrying the + # exception so the traceback reaches the logs + assert log.events == [ + { + "event": "results.compute_failed", + "level": "error", + "exc_info": exc, + "experiment__id": experiment.id, + "environment__id": experiment.environment_id, + "organisation__id": experiment.environment.project.organisation_id, + } + ] + + +def test_compute_experiment_results__not_started_experiment__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a created experiment that has never started + mock_compute = mocker.patch( + "experimentation.tasks.compute_results_summary", + ) + + # When + compute_experiment_results(experiment_id=experiment.id) + + # Then nothing is queried or stored + mock_compute.assert_not_called() + assert not ExperimentResults.objects.filter(experiment=experiment).exists() + + +def test_compute_experiment_results__final_row__skips_without_recompute( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given a completed experiment whose row already covers the full window + experiment.status = ExperimentStatus.COMPLETED + experiment.started_at = datetime(2026, 6, 1, tzinfo=dt_timezone.utc) + experiment.ended_at = datetime(2026, 6, 8, tzinfo=dt_timezone.utc) + experiment.save() + ExperimentResults.objects.create( + experiment=experiment, + as_of=experiment.ended_at, + payload=asdict(_results_summary()), + ) + mock_compute = mocker.patch( + "experimentation.tasks.compute_results_summary", + ) + + # When + compute_experiment_results(experiment_id=experiment.id) + + # Then the final payload is left untouched regardless of the caller + mock_compute.assert_not_called() + results = ExperimentResults.objects.get(experiment=experiment) + assert results.payload == asdict(_results_summary()) + + +def test_compute_experiment_results__experiment_deleted_after_enqueue__skips( + experiment: Experiment, + mocker: MockerFixture, +) -> None: + # Given the experiment is deleted between enqueue and execution + experiment_id = experiment.id + experiment.delete() + mock_compute = mocker.patch( + "experimentation.tasks.compute_results_summary", + ) + + # When + compute_experiment_results(experiment_id=experiment_id) + + # Then the task exits without raising into the task processor + mock_compute.assert_not_called() diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index b21521e825fc..645eb0decb76 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -90,6 +90,17 @@ Attributes: - `experiment.id` - `organisation.id` +### `experimentation.results.compute_failed` + +Logged at `error` from: + - `api/experimentation/tasks.py:81` + +Attributes: + - `environment.id` + - `exc_info` + - `experiment.id` + - `organisation.id` + ### `feature_health.feature_health_event_dismissal_not_supported` Logged at `warning` from: @@ -465,7 +476,7 @@ Attributes: ### `warehouse.connection.connected` Logged at `info` from: - - `api/experimentation/services.py:450` + - `api/experimentation/services.py:545` Attributes: - `environment.id` @@ -474,12 +485,32 @@ Attributes: ### `warehouse.connection.test_event_sent` Logged at `info` from: - - `api/experimentation/services.py:430` + - `api/experimentation/services.py:525` Attributes: - `environment.id` - `organisation.id` +### `warehouse.srm.overallocated` + +Logged at `error` from: + - `api/experimentation/services.py:381` + +Attributes: + - `environment.id` + - `experiment.id` + - `feature.id` + +### `warehouse.srm.unkeyed_variant` + +Logged at `error` from: + - `api/experimentation/services.py:367` + +Attributes: + - `environment.id` + - `experiment.id` + - `feature.id` + ### `workflows.change_request.committed` Logged at `info` from: