diff --git a/backend/backend/core/scheduler/celery_tasks.py b/backend/backend/core/scheduler/celery_tasks.py index 1ce72d8..4175a10 100644 --- a/backend/backend/core/scheduler/celery_tasks.py +++ b/backend/backend/core/scheduler/celery_tasks.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) + +def _sum_or_none(results, attr): + """Sum an attribute across results. Return None only if ALL values are None, + return 0 if the sum is legitimately zero (not all missing).""" + values = [getattr(r, attr, None) for r in results] + if all(v is None for v in values): + return None + return sum(v or 0 for v in values) + # Default max duration before a job is considered stuck (1 hour) DEFAULT_STUCK_JOB_THRESHOLD_SECONDS = 3600 @@ -361,12 +370,22 @@ def _clean_name(raw): "status": r.status, "end_status": r.end_status, "sequence": r.sequence_num, + "rows_affected": getattr(r, "rows_affected", None), + "rows_inserted": getattr(r, "rows_inserted", None), + "rows_updated": getattr(r, "rows_updated", None), + "rows_deleted": getattr(r, "rows_deleted", None), + "type": getattr(r, "materialization", "") or "", + "duration_ms": getattr(r, "duration_ms", None), } for r in user_results ], "total": len(user_results), "passed": sum(1 for r in user_results if r.end_status == "OK"), "failed": sum(1 for r in user_results if r.end_status == "FAIL"), + "rows_processed": _sum_or_none(user_results, "rows_affected"), + "rows_added": _sum_or_none(user_results, "rows_inserted"), + "rows_modified": _sum_or_none(user_results, "rows_updated"), + "rows_deleted": _sum_or_none(user_results, "rows_deleted"), } except Exception: _clear_base_result() diff --git a/backend/backend/core/scheduler/serializer.py b/backend/backend/core/scheduler/serializer.py index 7a45ac5..0a3c375 100644 --- a/backend/backend/core/scheduler/serializer.py +++ b/backend/backend/core/scheduler/serializer.py @@ -1,17 +1,105 @@ +from django.contrib.auth import get_user_model from rest_framework import serializers from backend.core.scheduler.models import TaskRunHistory +User = get_user_model() + class TaskRunHistorySerializer(serializers.ModelSerializer): duration = serializers.SerializerMethodField() + duration_ms = serializers.SerializerMethodField() + run_number = serializers.SerializerMethodField() + triggered_by = serializers.SerializerMethodField() + model_count = serializers.SerializerMethodField() + failed_models = serializers.SerializerMethodField() + skipped_count = serializers.SerializerMethodField() class Meta: model = TaskRunHistory - fields = "__all__" # Include all fields or specify fields like ['id', 'start_time', 'end_time', 'status'] + fields = [ + "id", "task_id", "status", "start_time", "end_time", + "trigger", "scope", "error_message", "result", "retry_num", + "user_task_detail", + "duration", "duration_ms", "run_number", "triggered_by", + "model_count", "failed_models", "skipped_count", + ] + + def _get_user_cache(self): + """Batch-load users for all runs in one query, cached per serializer instance.""" + if not hasattr(self, "_user_cache"): + user_ids = set() + for obj in self.instance if hasattr(self.instance, '__iter__') else [self.instance]: + if obj and isinstance(obj.kwargs, dict) and obj.kwargs.get("user_id"): + user_ids.add(obj.kwargs["user_id"]) + if user_ids: + self._user_cache = { + str(u.id): u for u in User.objects.filter(id__in=user_ids) + } + else: + self._user_cache = {} + return self._user_cache def get_duration(self, obj): - """Calculate duration (end_time - start_time)""" + """Human-readable duration string.""" if obj.start_time and obj.end_time: - return str(obj.end_time - obj.start_time) # Convert timedelta to string - return None # If end_time is missing, return None + delta = obj.end_time - obj.start_time + total_ms = int(delta.total_seconds() * 1000) + if total_ms < 1000: + return f"{total_ms}ms" + elif total_ms < 60000: + return f"{total_ms / 1000:.1f}s" + else: + minutes = total_ms // 60000 + seconds = (total_ms % 60000) / 1000 + return f"{minutes}m {seconds:.0f}s" + return None + + def get_duration_ms(self, obj): + """Duration in milliseconds for sorting/comparison.""" + if obj.start_time and obj.end_time: + return int((obj.end_time - obj.start_time).total_seconds() * 1000) + return None + + def get_run_number(self, obj): + """Sequential run number from view context (total - offset - idx).""" + run_numbers = self.context.get("run_numbers", {}) + return run_numbers.get(obj.id, 0) + + def get_triggered_by(self, obj): + """Resolve user_id from kwargs to username using batch-loaded cache.""" + if not isinstance(obj.kwargs, dict): + return None + user_id = obj.kwargs.get("user_id") + if not user_id: + return None + cache = self._get_user_cache() + user = cache.get(str(user_id)) + if user: + return { + "id": str(user.id), + "username": user.get_full_name() or user.username or user.email, + } + return {"id": str(user_id), "username": "Unknown user"} + + def get_model_count(self, obj): + """Total model count from result.""" + if obj.result and isinstance(obj.result, dict): + return obj.result.get("total", 0) + return 0 + + def get_failed_models(self, obj): + """List of failed model names.""" + if obj.result and isinstance(obj.result, dict): + models = obj.result.get("models", []) + return [m["name"] for m in models if m.get("end_status") == "FAIL" or m.get("status") == "failure"] + return [] + + def get_skipped_count(self, obj): + """Count of skipped models (total - passed - failed).""" + if obj.result and isinstance(obj.result, dict): + total = obj.result.get("total", 0) + passed = obj.result.get("passed", 0) + failed = obj.result.get("failed", 0) + return max(0, total - passed - failed) + return 0 diff --git a/backend/backend/core/scheduler/urls.py b/backend/backend/core/scheduler/urls.py index c9e32a4..1f0d8fe 100644 --- a/backend/backend/core/scheduler/urls.py +++ b/backend/backend/core/scheduler/urls.py @@ -7,6 +7,7 @@ delete_periodic_task, update_periodic_task, task_run_history, + run_stats, trigger_task_once, trigger_task_once_for_model, list_deploy_candidates, @@ -30,6 +31,7 @@ name="get_periodic_task", ), path("/run-history/", task_run_history, name="task_run_history"), + path("/run-stats/", run_stats, name="run_stats"), path( "/trigger-periodic-task/", trigger_task_once, diff --git a/backend/backend/core/scheduler/views.py b/backend/backend/core/scheduler/views.py index 195fea5..f7cd272 100644 --- a/backend/backend/core/scheduler/views.py +++ b/backend/backend/core/scheduler/views.py @@ -4,6 +4,7 @@ from datetime import timedelta from django.utils import timezone +from django.utils.dateparse import parse_datetime from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask from rest_framework import status from rest_framework.decorators import api_view, permission_classes @@ -583,6 +584,112 @@ def delete_periodic_task(request, project_id, task_id): ) +@api_view(["GET"]) +@permission_classes([IsAuthenticated]) +def run_stats(request, project_id, user_task_id): + """Get aggregated run statistics for a job — stats cards data.""" + try: + query = {"id": user_task_id} + if _is_valid_project_id(project_id): + query["project__project_uuid"] = project_id + task = UserTaskDetails.objects.get(**query) + runs = TaskRunHistory.objects.filter(user_task_detail=task) + + now = timezone.now() + last_7d = now - timedelta(days=7) + last_24h = now - timedelta(hours=24) + prev_24h_start = now - timedelta(hours=48) + + # Success rate (7 days) — only count completed runs in denominator + runs_7d = runs.filter(start_time__gte=last_7d) + completed_7d = runs_7d.filter(status__in=["SUCCESS", "FAILURE"]) + total_7d = completed_7d.count() + success_7d = completed_7d.filter(status="SUCCESS").count() + success_rate = round((success_7d / total_7d * 100), 1) if total_7d > 0 else None + + # Average duration (successful runs, 7 days) + successful_runs_7d = runs_7d.filter(status="SUCCESS", start_time__isnull=False, end_time__isnull=False) + avg_duration_ms = None + if successful_runs_7d.exists(): + durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in successful_runs_7d] + avg_duration_ms = int(sum(durations) / len(durations)) + + # Failures (24h) + comparison with previous 24h + failures_24h = runs.filter(start_time__gte=last_24h, status="FAILURE").count() + failures_prev_24h = runs.filter( + start_time__gte=prev_24h_start, start_time__lt=last_24h, status="FAILURE" + ).count() + + # Last successful run + last_success = runs.filter(status="SUCCESS").order_by("-end_time").first() + last_success_time = last_success.end_time if last_success else None + + # Expected duration (avg of last 5 successful runs) + recent_successes = runs.filter( + status="SUCCESS", start_time__isnull=False, end_time__isnull=False + ).order_by("-end_time")[:5] + expected_duration_ms = None + if recent_successes.exists(): + durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in recent_successes] + expected_duration_ms = int(sum(durations) / len(durations)) + + # Duration trend (last 10 completed runs for sparkline) + recent_runs = list(runs.filter( + start_time__isnull=False, end_time__isnull=False + ).order_by("-end_time")[:10]) + recent_runs.reverse() # chronological order for sparkline + duration_trend = [ + int((r.end_time - r.start_time).total_seconds() * 1000) for r in recent_runs + ] + + # Schedule info + schedule_type = None + schedule_label = None + periodic = None + try: + periodic = task.periodic_task + if periodic: + if periodic.crontab: + schedule_type = "cron" + c = periodic.crontab + schedule_label = f"{c.minute} {c.hour} {c.day_of_month} {c.month_of_year} {c.day_of_week}" + elif periodic.interval: + schedule_type = "interval" + schedule_label = f"Every {periodic.interval.every} {periodic.interval.period}" + except Exception: + periodic = None + + return Response({ + "success": True, + "data": { + "success_rate_7d": success_rate, + "success_count_7d": success_7d, + "total_count_7d": total_7d, + "avg_duration_ms": avg_duration_ms, + "failures_24h": failures_24h, + "failures_prev_24h": failures_prev_24h, + "failures_change": failures_24h - failures_prev_24h, + "last_successful_run": last_success_time, + "expected_duration_ms": expected_duration_ms, + "duration_trend": duration_trend, + "total_runs": runs.count(), + "job_name": task.task_name, + "environment": { + "name": task.environment.environment_name if task.environment else None, + "type": task.environment.deployment_type if task.environment else None, + }, + "schedule_type": schedule_type, + "schedule_label": schedule_label, + "schedule_enabled": periodic.enabled if periodic else False, + }, + }, status=status.HTTP_200_OK) + except UserTaskDetails.DoesNotExist: + return Response({"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND) + except Exception as e: + logger.error(f"Error getting run stats: {e}", exc_info=True) + return Response({"error": "Internal server error"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + @api_view(["GET"]) @permission_classes([IsAuthenticated]) def task_run_history(request, project_id, user_task_id): @@ -600,18 +707,40 @@ def task_run_history(request, project_id, user_task_id): trigger_filter = request.GET.get("trigger") scope_filter = request.GET.get("scope") status_filter = request.GET.get("status") + date_from = request.GET.get("date_from") + date_to = request.GET.get("date_to") + search = request.GET.get("search") + if trigger_filter: runs = runs.filter(trigger=trigger_filter) if scope_filter: runs = runs.filter(scope=scope_filter) if status_filter: runs = runs.filter(status=status_filter) + if date_from: + dt = parse_datetime(date_from) + if dt: + runs = runs.filter(start_time__gte=dt) + if date_to: + dt = parse_datetime(date_to) + if dt: + runs = runs.filter(start_time__lte=dt) + if search: + runs = runs.filter(error_message__icontains=search) runs = runs.order_by("-start_time") total = runs.count() offset = (page - 1) * limit - serializer = TaskRunHistorySerializer(runs[offset : offset + limit], many=True) + page_qs = runs[offset : offset + limit] + # Compute run numbers from total and offset — no extra query needed + run_numbers = { + run.id: total - offset - idx + for idx, run in enumerate(page_qs) + } + serializer = TaskRunHistorySerializer( + page_qs, many=True, context={"run_numbers": run_numbers} + ) return Response( { @@ -620,6 +749,7 @@ def task_run_history(request, project_id, user_task_id): "page_items": { "id": task.id, "job_name": task.task_name, + "project_id": str(task.project.project_uuid) if task.project else None, "env_type": task.environment.deployment_type if task.environment else None, @@ -705,9 +835,10 @@ def trigger_task_once(request, project_id, user_task_id): synchronous (in-process) execution so local dev works without Redis. """ try: - task = UserTaskDetails.objects.get( - id=user_task_id, project__project_uuid=project_id - ) + query = {"id": user_task_id} + if _is_valid_project_id(project_id): + query["project__project_uuid"] = project_id + task = UserTaskDetails.objects.get(**query) except UserTaskDetails.DoesNotExist: return Response( {"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND diff --git a/backend/visitran/adapters/adapter.py b/backend/visitran/adapters/adapter.py index 295361c..111a1fe 100644 --- a/backend/visitran/adapters/adapter.py +++ b/backend/visitran/adapters/adapter.py @@ -67,10 +67,10 @@ def db_scd(self) -> BaseSCD: def db_reader(self) -> BaseDBReader: return self._db_reader - def run_model(self, visitran_model: VisitranModel) -> None: + def run_model(self, visitran_model: VisitranModel): self.load_model(model=visitran_model) fire_event(MaterializationType(materialization=str(visitran_model.materialization))) - self.db_model.execute() + return self.db_model.execute() def run_seeds(self, schema: str, abs_path: str) -> None: seed_obj = self.load_seed(schema, abs_path) diff --git a/backend/visitran/adapters/bigquery/connection.py b/backend/visitran/adapters/bigquery/connection.py index 6b45997..ac903af 100644 --- a/backend/visitran/adapters/bigquery/connection.py +++ b/backend/visitran/adapters/bigquery/connection.py @@ -269,16 +269,9 @@ def merge_into_table( target_table_name: str, select_statement: Table, primary_key: Union[str, list[str]] = None, - ) -> None: + ) -> dict: """Efficient upsert using DELETE + INSERT for BigQuery. - - This approach is more efficient than MERGE for BigQuery because: - 1. BigQuery is optimized for bulk operations - 2. DELETE + INSERT performs better than UPDATE operations - 3. Works better with BigQuery's partitioning strategy - - Args: - primary_key: Can be a single column name (str) or list of column names for composite keys + Returns dict with rows_affected. """ try: fire_event( @@ -378,6 +371,7 @@ def merge_into_table( raise Exception( f"BigQuery incremental upsert failed for {schema_name}.{target_table_name}: {str(e)}" ) from e + return {"rows_affected": None} # BigQuery: fallback to get_table_row_count in BaseModel diff --git a/backend/visitran/adapters/bigquery/model.py b/backend/visitran/adapters/bigquery/model.py index c9d3f4e..471fba2 100644 --- a/backend/visitran/adapters/bigquery/model.py +++ b/backend/visitran/adapters/bigquery/model.py @@ -92,12 +92,14 @@ def execute_incremental(self) -> None: # Get primary key from model if available primary_key = getattr(self.model, 'primary_key', None) - self.db_connection.merge_into_table( + result = self.db_connection.merge_into_table( schema_name=self.model.destination_schema_name, target_table_name=self.model.destination_table_name, select_statement=self.model.select_statement, primary_key=primary_key, ) + if result and isinstance(result, dict): + self._upsert_metrics = result else: fire_event( ExecuteIncrementalCreate( diff --git a/backend/visitran/adapters/databricks/connection.py b/backend/visitran/adapters/databricks/connection.py index 3ca70f8..7fd14cc 100644 --- a/backend/visitran/adapters/databricks/connection.py +++ b/backend/visitran/adapters/databricks/connection.py @@ -224,21 +224,12 @@ def upsert_into_table( table_name: str, select_statement: "Table", primary_key: Union[str, list[str]], - ) -> None: + ) -> dict: """Efficient upsert using Databricks Delta Lake's MERGE INTO statement. - - This approach is optimal for Databricks because: - 1. Delta Lake natively supports MERGE INTO with ACID guarantees - 2. Atomic operation with automatic conflict resolution - 3. Handles both single and composite keys efficiently - 4. Leverages Delta Lake's optimized merge implementation - - Args: - schema_name: Target schema/database name - table_name: Target table name - select_statement: Ibis table expression with incremental data - primary_key: Column(s) to match for upsert (single string or list) + Returns dict with rows_affected from cursor.rowcount. """ + rowcount = None + # Handle both single column and composite keys if isinstance(primary_key, str): key_columns = [primary_key] @@ -286,8 +277,13 @@ def upsert_into_table( VALUES ({insert_values}) """ - cursor = self.connection.raw_sql(merge_query) - cursor.close() + merge_cursor = self.connection.raw_sql(merge_query) + _rc = merge_cursor.rowcount if hasattr(merge_cursor, "rowcount") else None + rowcount = _rc if (_rc is not None and _rc >= 0) else None + try: + merge_cursor.close() + except Exception: + pass logging.info("Databricks MERGE completed for %s.%s", schema_name, table_name) except Exception as e: @@ -298,7 +294,8 @@ def upsert_into_table( ) from e finally: try: - cursor = self.connection.raw_sql(f"DROP TABLE IF EXISTS {temp_table_fq}") - cursor.close() + cleanup_cursor = self.connection.raw_sql(f"DROP TABLE IF EXISTS {temp_table_fq}") + cleanup_cursor.close() except Exception: pass + return {"rows_affected": rowcount} diff --git a/backend/visitran/adapters/databricks/model.py b/backend/visitran/adapters/databricks/model.py index 24e7371..bc3ba41 100644 --- a/backend/visitran/adapters/databricks/model.py +++ b/backend/visitran/adapters/databricks/model.py @@ -96,12 +96,14 @@ def execute_incremental(self) -> None: if primary_key: # MERGE mode: Upsert with primary key (updates existing, inserts new) logging.info(f"Incremental MERGE mode: upserting with primary_key={primary_key}") - self.db_connection.upsert_into_table( + result = self.db_connection.upsert_into_table( schema_name=self.model.destination_schema_name, table_name=self.model.destination_table_name, select_statement=self.model.select_statement, primary_key=primary_key, ) + if result and isinstance(result, dict): + self._upsert_metrics = result else: # APPEND mode: Insert-only, no deduplication (for event logs, time-series) logging.info("Incremental APPEND mode: inserting without deduplication") diff --git a/backend/visitran/adapters/model.py b/backend/visitran/adapters/model.py index 2a7b46a..085f494 100644 --- a/backend/visitran/adapters/model.py +++ b/backend/visitran/adapters/model.py @@ -2,13 +2,24 @@ import logging from abc import ABC, abstractmethod -from typing import Any +from dataclasses import dataclass +from typing import Any, Optional from visitran.adapters.connection import BaseConnection from visitran.materialization import Materialization from visitran.templates.model import VisitranModel +@dataclass +class ExecutionMetrics: + """Metrics returned from model execution.""" + rows_affected: Optional[int] = None + rows_inserted: Optional[int] = None + rows_updated: Optional[int] = None + rows_deleted: Optional[int] = None + materialization: str = "" + + class BaseModel(ABC): def __init__(self, db_connection: BaseConnection, model: VisitranModel) -> None: super().__init__() @@ -17,6 +28,7 @@ def __init__(self, db_connection: BaseConnection, model: VisitranModel) -> None: self._model: VisitranModel = model self._statements: list[Any] = [] + self._upsert_metrics: Optional[dict] = None # Populated by adapter's execute_incremental @property def model(self) -> VisitranModel: @@ -26,20 +38,57 @@ def model(self) -> VisitranModel: def materialization(self) -> Materialization: return self.model.materialization - def execute(self) -> None: + def execute(self) -> ExecutionMetrics: + mat_name = self.materialization.value if hasattr(self.materialization, "value") else str(self.materialization) + if self.materialization == Materialization.EPHEMERAL: self.execute_ephemeral() + return ExecutionMetrics(rows_affected=None, materialization="ephemeral") self.model.select_statement = self.model.select() if self.materialization == Materialization.TABLE: self.execute_table() + # Get row count after table creation — all rows are "inserted" (DROP + CREATE) + rows = self._get_row_count_safe() + return ExecutionMetrics( + rows_affected=rows, + rows_inserted=rows, + rows_updated=0, + rows_deleted=0, + materialization="table", + ) elif self.materialization == Materialization.VIEW: self.execute_view() + return ExecutionMetrics(rows_affected=None, materialization="view") elif self.materialization == Materialization.INCREMENTAL: self.execute_incremental() + # Use upsert metrics if available (adapter captured cursor.rowcount) + upsert = self._upsert_metrics or {} + upsert_rows = upsert.get("rows_affected") + rows = upsert_rows if upsert_rows is not None else self._get_row_count_safe() + return ExecutionMetrics( + rows_affected=rows, + rows_inserted=upsert.get("rows_inserted"), + rows_updated=upsert.get("rows_updated"), + rows_deleted=upsert.get("rows_deleted"), + materialization="incremental", + ) + + return ExecutionMetrics(materialization=mat_name) + + def _get_row_count_safe(self) -> Optional[int]: + """Get row count after execution, return None on failure.""" + try: + return self._db_connection.get_table_row_count( + schema_name=self.model.destination_schema_name, + table_name=self.model.destination_table_name, + ) + except Exception as e: + logging.info(f"Could not get row count for {self.model.destination_table_name}: {e}") + return None @abstractmethod def execute_ephemeral(self) -> None: diff --git a/backend/visitran/adapters/postgres/connection.py b/backend/visitran/adapters/postgres/connection.py index beebff6..ae56739 100644 --- a/backend/visitran/adapters/postgres/connection.py +++ b/backend/visitran/adapters/postgres/connection.py @@ -204,7 +204,7 @@ def upsert_into_table( table_name: str, select_statement: "Table", primary_key: Union[str, list[str]], - ) -> None: + ) -> dict: """Efficient upsert using PostgreSQL's INSERT ... ON CONFLICT. This approach is optimal for PostgreSQL because: @@ -212,6 +212,8 @@ def upsert_into_table( 2. No temporary tables needed 3. Atomic operation 4. Better performance than MERGE for PostgreSQL + + Returns dict with rows_affected from cursor.rowcount. """ # Handle both single column and composite keys @@ -232,7 +234,7 @@ def upsert_into_table( pass else: self._fallback_upsert(schema_name, table_name, select_statement, key_columns) - return + return {"rows_affected": None} qi = self.quote_identifier @@ -255,8 +257,15 @@ def upsert_into_table( DO UPDATE SET {update_set_clause} """ - # Execute the upsert - self.connection.raw_sql(upsert_query) + # Execute the upsert and capture rowcount + cursor = self.connection.raw_sql(upsert_query) + _rc = cursor.rowcount if hasattr(cursor, "rowcount") else None + rowcount = _rc if (_rc is not None and _rc >= 0) else None + try: + cursor.close() + except Exception: + pass + return {"rows_affected": rowcount} diff --git a/backend/visitran/adapters/postgres/model.py b/backend/visitran/adapters/postgres/model.py index d0abc94..ab68190 100644 --- a/backend/visitran/adapters/postgres/model.py +++ b/backend/visitran/adapters/postgres/model.py @@ -85,12 +85,14 @@ def execute_incremental(self) -> None: if primary_key: # MERGE mode: Upsert with primary key (updates existing, inserts new) logging.info(f"Incremental MERGE mode: upserting with primary_key={primary_key}") - self.db_connection.upsert_into_table( + result = self.db_connection.upsert_into_table( schema_name=self.model.destination_schema_name, table_name=self.model.destination_table_name, select_statement=self.model.select_statement, primary_key=primary_key, ) + if result and isinstance(result, dict): + self._upsert_metrics = result else: # APPEND mode: Insert-only, no deduplication (for event logs, time-series) logging.info("Incremental APPEND mode: inserting without deduplication") diff --git a/backend/visitran/adapters/snowflake/connection.py b/backend/visitran/adapters/snowflake/connection.py index d9df64b..924a838 100644 --- a/backend/visitran/adapters/snowflake/connection.py +++ b/backend/visitran/adapters/snowflake/connection.py @@ -279,15 +279,12 @@ def upsert_into_table( table_name: str, select_statement: "Table", primary_key: Union[str, list[str]], - ) -> None: + ) -> dict: """Efficient upsert using Snowflake's MERGE INTO statement. - - This approach is optimal for Snowflake because: - 1. MERGE INTO is natively supported and optimized - 2. Atomic operation with ACID properties - 3. Handles both single and composite keys efficiently - 4. Better performance than separate DELETE + INSERT + Returns dict with rows_affected from cursor.rowcount. """ + rowcount = None + # Handle both single column and composite keys if isinstance(primary_key, str): key_columns = [primary_key] @@ -330,7 +327,13 @@ def upsert_into_table( VALUES ({insert_values}) """ - self.connection.raw_sql(merge_query) + cursor = self.connection.raw_sql(merge_query) + _rc = cursor.rowcount if hasattr(cursor, "rowcount") else None + rowcount = _rc if (_rc is not None and _rc >= 0) else None + try: + cursor.close() + except Exception: + pass except Exception as e: logging.error(f"Snowflake upsert failed for {schema_name}.{table_name}: {str(e)}") @@ -341,3 +344,4 @@ def upsert_into_table( self.connection.raw_sql(f"DROP TABLE IF EXISTS {qi(schema_name)}.{qi(temp_table_name)}") except Exception: pass # Ignore cleanup errors + return {"rows_affected": rowcount} diff --git a/backend/visitran/adapters/snowflake/model.py b/backend/visitran/adapters/snowflake/model.py index 3018f14..f98c51a 100644 --- a/backend/visitran/adapters/snowflake/model.py +++ b/backend/visitran/adapters/snowflake/model.py @@ -88,12 +88,14 @@ def execute_incremental(self) -> None: if primary_key: # MERGE mode: Upsert with primary key (updates existing, inserts new) logging.info(f"Incremental MERGE mode: upserting with primary_key={primary_key}") - self.db_connection.upsert_into_table( + result = self.db_connection.upsert_into_table( schema_name=self.model.destination_schema_name, table_name=self.model.destination_table_name, select_statement=self.model.select_statement, primary_key=primary_key, ) + if result and isinstance(result, dict): + self._upsert_metrics = result else: # APPEND mode: Insert-only, no deduplication (for event logs, time-series) logging.info("Incremental APPEND mode: inserting without deduplication") diff --git a/backend/visitran/adapters/trino/connection.py b/backend/visitran/adapters/trino/connection.py index 0ad9f9e..b7aaa4d 100644 --- a/backend/visitran/adapters/trino/connection.py +++ b/backend/visitran/adapters/trino/connection.py @@ -147,16 +147,13 @@ def upsert_into_table( table_name: str, select_statement: "Table", primary_key: Union[str, list[str]], - ) -> None: + ) -> dict: """Efficient upsert using DELETE + INSERT strategy for Trino. - - Notes: - - MERGE can be expensive in Trino due to full joins. We avoid it. - - When a primary_key is provided (single or composite), we perform: - 1) DELETE matching rows in target - 2) INSERT all rows from the temp incremental table - - Without a primary_key we fall back to simple INSERT (may create duplicates). + Returns dict with rows_deleted and rows_inserted from cursors. """ + inserted = None + deleted = None + # Normalize primary key(s) if isinstance(primary_key, str): key_columns = [primary_key] @@ -187,7 +184,13 @@ def upsert_into_table( WHERE {where_clause} ) """ - self.connection.raw_sql(delete_sql) + del_cursor = self.connection.raw_sql(delete_sql) + _rc = del_cursor.rowcount if hasattr(del_cursor, "rowcount") else None + deleted = _rc if (_rc is not None and _rc >= 0) else None + try: + del_cursor.close() + except Exception: + pass # 2b. INSERT all rows from temp (includes new/updated) insert_cols = ", ".join([qi(c) for c in target_columns]) @@ -196,7 +199,13 @@ def upsert_into_table( SELECT {insert_cols} FROM {qi(schema_name)}.{qi(temp_table_name)} """ - self.connection.raw_sql(insert_sql) + ins_cursor = self.connection.raw_sql(insert_sql) + _rc = ins_cursor.rowcount if hasattr(ins_cursor, "rowcount") else None + inserted = _rc if (_rc is not None and _rc >= 0) else None + try: + ins_cursor.close() + except Exception: + pass except Exception as e: logging.error(f"Trino upsert (DELETE+INSERT) failed for {schema_name}.{table_name}: {str(e)}") @@ -209,3 +218,8 @@ def upsert_into_table( self.connection.raw_sql(f"DROP TABLE IF EXISTS {qi(schema_name)}.{qi(temp_table_name)}") except Exception: pass + return { + "rows_affected": inserted, + "rows_inserted": inserted, + "rows_deleted": deleted, + } diff --git a/backend/visitran/adapters/trino/model.py b/backend/visitran/adapters/trino/model.py index c7b045e..ffb55ed 100644 --- a/backend/visitran/adapters/trino/model.py +++ b/backend/visitran/adapters/trino/model.py @@ -84,12 +84,14 @@ def execute_incremental(self) -> None: if primary_key: # MERGE mode: Upsert with primary key (updates existing, inserts new) logging.info(f"Incremental MERGE mode: upserting with primary_key={primary_key}") - self.db_connection.upsert_into_table( + result = self.db_connection.upsert_into_table( schema_name=self.model.destination_schema_name, table_name=self.model.destination_table_name, select_statement=self.model.select_statement, primary_key=primary_key, ) + if result and isinstance(result, dict): + self._upsert_metrics = result else: # APPEND mode: Insert-only, no deduplication (for event logs, time-series) logging.info("Incremental APPEND mode: inserting without deduplication") diff --git a/backend/visitran/events/printer.py b/backend/visitran/events/printer.py index 19f1a19..79000fa 100644 --- a/backend/visitran/events/printer.py +++ b/backend/visitran/events/printer.py @@ -27,6 +27,12 @@ class BaseResult: ending_time: datetime.datetime sequence_num: int end_status: str + rows_affected: int | None = None + rows_inserted: int | None = None + rows_updated: int | None = None + rows_deleted: int | None = None + materialization: str = "" + duration_ms: int | None = None @dataclass diff --git a/backend/visitran/visitran.py b/backend/visitran/visitran.py index 4ccee4c..57908b4 100644 --- a/backend/visitran/visitran.py +++ b/backend/visitran/visitran.py @@ -345,9 +345,10 @@ def execute_graph(self) -> None: ) ) self.db_adapter.db_connection.create_schema(node.destination_schema_name) # create if not exists - self.db_adapter.run_model(visitran_model=node) + exec_metrics = self.db_adapter.run_model(visitran_model=node) _elapsed = time.monotonic() - start_time + _elapsed_ms = int(_elapsed * 1000) fire_event( ModelRunSucceeded( model_name=_model_display, @@ -360,6 +361,16 @@ def execute_graph(self) -> None: run_duration=_elapsed, ) + # Extract row count from execution metrics + _rows = _rows_ins = _rows_upd = _rows_del = None + _mat = "" + if exec_metrics is not None: + _rows = getattr(exec_metrics, "rows_affected", None) + _rows_ins = getattr(exec_metrics, "rows_inserted", None) + _rows_upd = getattr(exec_metrics, "rows_updated", None) + _rows_del = getattr(exec_metrics, "rows_deleted", None) + _mat = getattr(exec_metrics, "materialization", "") + base_result = BaseResult( node_name=str(node_name), sequence_num=sequence_number, @@ -368,6 +379,12 @@ def execute_graph(self) -> None: info_message=f"Running {node_name}", status=ExecStatus.Success.value, end_status=ExecStatus.OK.value, + rows_affected=_rows, + rows_inserted=_rows_ins, + rows_updated=_rows_upd, + rows_deleted=_rows_del, + materialization=_mat, + duration_ms=_elapsed_ms, ) sequence_number += 1 BASE_RESULT.append(base_result) diff --git a/frontend/src/ide/run-history/RunHistory.css b/frontend/src/ide/run-history/RunHistory.css index 7e7e895..c4b60f4 100644 --- a/frontend/src/ide/run-history/RunHistory.css +++ b/frontend/src/ide/run-history/RunHistory.css @@ -1,56 +1,159 @@ -/* RunHistory.css */ +/* RunHistory.css — Matches design HTML exactly */ .runhistory-container { height: 100%; display: flex; flex-direction: column; + overflow: auto; } -.runhistory-title { - font-weight: bold; - padding: 12px 20px 0; +/* ── Job Switcher ── */ +.rh-job-switcher { + padding: 12px 14px; + background: var(--bg-color-3, rgba(59, 130, 246, 0.04)); + border: 1px solid var(--border-color-1, rgba(59, 130, 246, 0.15)); + border-radius: 10px; } -.runhistory-filters { +.rh-job-option { display: flex; - justify-content: space-between; align-items: center; - padding: 10px 20px; + gap: 8px; + padding: 2px 0; } -.runhistory-job-select { - width: 240px; +.rh-job-dot { + width: 8px; + height: 8px; + border-radius: 50%; + flex-shrink: 0; } -.runhistory-status-select { - width: 160px; +.rh-job-dot.success { + background: var(--success-color, #52c41a); +} +.rh-job-dot.failed { + background: var(--error-color, #ff4d4f); +} +.rh-job-dot.paused { + background: var(--font-color-3, #8c8c8c); } -.runhistory-job-info { - display: flex; - justify-content: space-between; - align-items: center; - padding: 0 20px 10px; +.rh-job-option-name { + font-weight: 500; +} +.rh-job-option-meta { + font-size: 11px; + color: var(--font-color-3, #8c8c8c); +} + +/* ── Header ── */ +.runhistory-header { + padding: 20px 24px 8px; } +/* ── Table ── */ .runhistory-table-container { flex: 1; - padding: 0 20px 20px; + padding: 0 24px 24px; overflow: auto; } -/* Visually bind an expanded error row to its parent run row so the - * error panel reads as a continuation of that row, not a sibling. */ .runhistory-table-container .runhistory-row-expanded > td { border-bottom-color: transparent !important; } .runhistory-table-container .ant-table-expanded-row > td { border-top: 0 !important; - padding: 0 !important; + padding: 0 8px !important; background: transparent !important; } .runhistory-table-container .ant-table-expanded-row:hover > td { background: transparent !important; } + +/* ── Trigger icon circle ── */ +.rh-trigger-icon { + width: 22px; + height: 22px; + border-radius: 50%; + background: var(--bg-color-3, rgba(0, 0, 0, 0.05)); + display: inline-flex; + align-items: center; + justify-content: center; + color: var(--font-color-3, #8c8c8c); +} + +/* ── Duration bar ── */ +.rh-dur-bar { + width: 60px; + height: 3px; + background: var(--border-color-1, rgba(0, 0, 0, 0.08)); + border-radius: 2px; + overflow: hidden; + margin-top: 3px; +} + +.rh-dur-bar-fill { + height: 100%; + border-radius: 2px; +} + +.rh-dur-bar-fill.fail { + background: var(--error-color, #ff4d4f); +} + +.rh-dur-bar-fill.ok { + background: var(--success-color, #52c41a); +} + +/* ── Error box (expanded detail) ── */ +.rh-error-box { + border-left: 3px solid var(--error-color, #ff4d4f); + padding: 12px 14px; + background: var(--error-bg, rgba(255, 77, 79, 0.06)); + border-radius: 6px; + margin-bottom: 14px; +} + +.rh-error-box-title { + font-weight: 600; + color: var(--error-color, #ff4d4f); + font-size: 13px; + margin-bottom: 8px; + display: flex; + justify-content: space-between; + align-items: center; +} + +.rh-error-msg { + font-family: var(--font-family-code, "SF Mono", Consolas, monospace); + font-size: 12px; + background: var(--bg-color-3, rgba(0, 0, 0, 0.04)); + padding: 8px 10px; + border-radius: 4px; + margin-bottom: 8px; +} + +.rh-error-stack { + font-family: var(--font-family-code, "SF Mono", Consolas, monospace); + font-size: 11px; + padding: 8px 10px; + background: var(--bg-color-3, rgba(0, 0, 0, 0.04)); + border-radius: 4px; + color: var(--font-color-3, #8c8c8c); + line-height: 1.6; + max-height: 100px; + overflow: auto; + white-space: pre-wrap; + word-break: break-word; +} + +/* ── Responsive ── */ +@media (max-width: 1000px) { + .runhistory-container .ant-row > .ant-col[class*="ant-col-6"] { + flex: 0 0 50%; + max-width: 50%; + } +} diff --git a/frontend/src/ide/run-history/Runhistory.jsx b/frontend/src/ide/run-history/Runhistory.jsx index 2d4cf0a..769188e 100644 --- a/frontend/src/ide/run-history/Runhistory.jsx +++ b/frontend/src/ide/run-history/Runhistory.jsx @@ -1,6 +1,6 @@ -import { useEffect, useState, useMemo, useCallback, useRef } from "react"; +/* eslint-disable eqeqeq, no-mixed-operators, react/prop-types */ +import { useEffect, useState, useMemo, useCallback } from "react"; import { - Alert, Select, Table, Tag, @@ -10,55 +10,163 @@ import { Button, Space, Tooltip, + Input, + Card, + Badge, + Avatar, + Row, + Col, + Timeline, + DatePicker, } from "antd"; import { ReloadOutlined, CalendarOutlined, - DatabaseOutlined, CheckCircleFilled, CloseCircleFilled, ClockCircleOutlined, SyncOutlined, + SearchOutlined, + CopyOutlined, + EyeOutlined, + RedoOutlined, + LoadingOutlined, + UpOutlined, + DownOutlined, + MinusOutlined, + LeftOutlined, + RightOutlined, + SwapOutlined, } from "@ant-design/icons"; -import { useSearchParams } from "react-router-dom"; +import { useSearchParams, useNavigate } from "react-router-dom"; import { useAxiosPrivate } from "../../service/axios-service"; import { orgStore } from "../../store/org-store"; import { useNotificationService } from "../../service/notification-service"; -import { runHistoryTagColor } from "../../common/constants"; import { usePagination } from "../../widgets/hooks/usePagination"; -import { - getTooltipText, - getRelativeTime, - formatDateTime, -} from "../../common/helpers"; +import { getRelativeTime, formatDateTime } from "../../common/helpers"; import "./RunHistory.css"; -/* ─── Parse duration string to milliseconds for sorting ─── */ -const parseDurationMs = (duration) => { - if (!duration) return 0; - const parts = duration.split(":"); - if (parts.length !== 3) return 0; - const hours = parseInt(parts[0], 10); - const mins = parseInt(parts[1], 10); - const secs = parseFloat(parts[2]); - const h = hours * 3600; - const m = mins * 60; - return (h + m + secs) * 1000; +const { Text, Title } = Typography; +const { RangePicker } = DatePicker; + +/* ── Duration helpers ── */ +const formatDurationMs = (ms) => { + if (!ms && ms !== 0) return "—"; + if (ms < 1000) return `${ms}ms`; + if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`; + const m = Math.floor(ms / 60000); + const s = ((ms % 60000) / 1000).toFixed(0); + return `${m}m ${s}s`; +}; + +const parseDurationMs = (d) => { + if (!d) return 0; + if (typeof d === "number") return d; + // Handle serializer format: "1m 30s", "45.0s", "800ms" + const str = String(d); + let ms = 0; + const minMatch = str.match(/([\d.]+)\s*m(?!s)/); + const secMatch = str.match(/([\d.]+)\s*s/); + const msMatch = str.match(/([\d.]+)\s*ms/); + if (msMatch) ms += parseFloat(msMatch[1]); + if (secMatch) ms += parseFloat(secMatch[1]) * 1000; + if (minMatch) ms += parseFloat(minMatch[1]) * 60000; + if (ms > 0) return ms; + // Fallback: HH:MM:SS format + const p = str.split(":"); + if (p.length === 3) { + return ( + (parseInt(p[0], 10) * 3600 + parseInt(p[1], 10) * 60 + parseFloat(p[2])) * + 1000 + ); + } + return 0; +}; + +/* ── Sparkline SVG — plots real duration data points ── */ +const Sparkline = ({ color = "#3b82f6", data = [] }) => { + if (!data || data.length < 2) { + // Fallback — flat line + return ( + + + + ); + } + const max = Math.max(...data); + const min = Math.min(...data); + const range = max - min || 1; + const points = data + .map((v, i) => { + const x = (i / (data.length - 1)) * 100; + const y = 26 - ((v - min) / range) * 24; // 2px top padding, 26px range + return `${x},${y}`; + }) + .join(" "); + return ( + + + + ); }; -/* ─── Duration formatter: "HH:MM:SS.sss" → human-readable ─── */ -const formatDuration = (duration) => { - if (!duration) return "—"; - const parts = duration.split(":"); - if (parts.length !== 3) return duration; - const hours = parseInt(parts[0], 10); - const mins = parseInt(parts[1], 10); - const secs = parseFloat(parts[2]); - if (hours > 0) return `${hours}h ${mins}m ${Math.round(secs)}s`; - if (mins > 0) return `${mins}m ${secs.toFixed(1)}s`; - if (secs >= 1) return `${secs.toFixed(1)}s`; - return `${Math.round(secs * 1000)}ms`; +/* ── StatCard ── */ +const StatCard = ({ label, icon, value, valueColor, subtext, spark }) => { + return ( + + + + {icon} + {icon ? " " : ""} + {label} + +
+ {value} +
+ {spark} + {subtext && ( + {subtext} + )} +
+
+ ); }; const STATUS_OPTIONS = [ @@ -69,21 +177,13 @@ const STATUS_OPTIONS = [ { label: "Revoked", value: "REVOKED" }, ]; -const getRunTriggerScope = (row) => { - const kw = row?.kwargs || {}; - const legacyQuick = kw.source === "quick_deploy"; - const models = kw.models_override || []; - const trigger = - row?.trigger || kw.trigger || (legacyQuick ? "manual" : "scheduled"); - const scope = - row?.scope || - kw.scope || - (models.length > 0 || legacyQuick ? "model" : "job"); - return { trigger, scope, models }; -}; - const Runhistory = () => { const axios = useAxiosPrivate(); + const navigate = useNavigate(); + const { token } = theme.useToken(); + const { notify } = useNotificationService(); + const { selectedOrgId } = orgStore(); + const [searchParams, setSearchParams] = useSearchParams(); const { currentPage, pageSize, @@ -92,144 +192,162 @@ const Runhistory = () => { setCurrentPage, setPageSize, } = usePagination(); + const [jobListItems, setJobListItems] = useState([]); - const [backUpData, setBackUpData] = useState([]); - const [JobHistoryData, setJobHistoryData] = useState([]); - const [jobSchedule, setJobSchedule] = useState({}); + const [jobListFull, setJobListFull] = useState([]); + const [jobHistoryData, setJobHistoryData] = useState([]); const [expandedRowKeys, setExpandedRowKeys] = useState([]); - const [filterQueries, setFilterQuery] = useState({ - status: "", + const [loading, setLoading] = useState(false); + const [stats, setStats] = useState(null); + const [statsLoading, setStatsLoading] = useState(false); + const [filters, setFilters] = useState({ job: "", + status: "", trigger: "", - scope: "", + search: "", }); - + const [searchText, setSearchText] = useState(""); + const [datePreset, setDatePreset] = useState("24h"); + const [customDateRange, setCustomDateRange] = useState(null); + const [showCustomDate, setShowCustomDate] = useState(false); const [envInfo, setEnvInfo] = useState({ env_type: "", job_name: "", id: "", + project_id: "", }); - const [loading, setLoading] = useState(false); - const { selectedOrgId } = orgStore(); - const { token } = theme.useToken(); - const { notify } = useNotificationService(); - const [searchParams, setSearchParams] = useSearchParams(); + const orgId = selectedOrgId || "default_org"; + const [retryLoading, setRetryLoading] = useState(false); + + /* ── APIs ── */ + const fetchStats = useCallback( + async (taskId) => { + setStatsLoading(true); + try { + const res = await axios.get( + `/api/v1/visitran/${orgId}/project/_all/jobs/run-stats/${taskId}` + ); + setStats(res.data.data); + } catch { + setStats(null); + } finally { + setStatsLoading(false); + } + }, + [axios, orgId] + ); - /* ─── API calls ─── */ - const getRunHistoryList = useCallback( - async (Id, page = currentPage, limit = pageSize, filters = {}) => { + const fetchHistory = useCallback( + async (taskId, page = 1, limit = pageSize, f = filters) => { setLoading(true); try { const params = { page, limit }; - if (filters.status) params.status = filters.status; - if (filters.trigger) params.trigger = filters.trigger; - if (filters.scope) params.scope = filters.scope; - const res = await axios({ - method: "GET", - url: `/api/v1/visitran/${ - selectedOrgId || "default_org" - }/project/_all/jobs/run-history/${Id}`, - params, - }); + if (f.status) params.status = f.status; + if (f.trigger) params.trigger = f.trigger; + if (f.search) params.search = f.search; + // Date filter — preset or custom range + if (datePreset === "custom" && customDateRange?.[0]) { + params.date_from = customDateRange[0].toISOString(); + if (customDateRange[1]) + params.date_to = customDateRange[1].toISOString(); + } else if (datePreset && datePreset !== "all") { + const now = new Date(); + const presetMs = { + "24h": 86400000, + "7d": 604800000, + "30d": 2592000000, + }; + if (presetMs[datePreset]) + params.date_from = new Date( + now - presetMs[datePreset] + ).toISOString(); + } + const res = await axios.get( + `/api/v1/visitran/${orgId}/project/_all/jobs/run-history/${taskId}`, + { params } + ); const { page_items, total_items, current_page } = res.data.data; setTotalCount(total_items); setCurrentPage(current_page); - const { env_type, job_name, run_history, id } = page_items; - setEnvInfo({ env_type, job_name, id }); - setJobHistoryData(run_history); - setBackUpData(run_history); + setEnvInfo({ + env_type: page_items.env_type, + job_name: page_items.job_name, + id: page_items.id, + project_id: page_items.project_id, + }); + setJobHistoryData(page_items.run_history || []); } catch (error) { notify({ error }); } finally { setLoading(false); } }, - [axios, selectedOrgId, notify] + [axios, orgId, pageSize, notify, datePreset, customDateRange] ); - const getJobList = async () => { - setLoading(true); + const fetchJobs = async () => { try { - const res = await axios({ - method: "GET", - url: `/api/v1/visitran/${ - selectedOrgId || "default_org" - }/project/_all/jobs/list-periodic-tasks`, - }); + const res = await axios.get( + `/api/v1/visitran/${orgId}/project/_all/jobs/list-periodic-tasks` + ); const { page_items } = res.data.data; - const scheduledObj = {}; - const jobIds = page_items.map((el) => { - const taskDetails = el.periodic_task_details?.[el.task_type]; - if (taskDetails) { - scheduledObj[el.user_task_id] = getTooltipText( - taskDetails, - el.task_type - ); - } - return { label: el.task_name, value: el.user_task_id }; - }); - setJobSchedule(scheduledObj); - setJobListItems(jobIds); - if (jobIds.length) { - const taskFromUrl = searchParams.get("task"); - const taskFromUrlNum = taskFromUrl ? Number(taskFromUrl) : NaN; - const matchedFromUrl = !Number.isNaN(taskFromUrlNum) - ? jobIds.find((j) => j.value === taskFromUrlNum) + const jobs = page_items.map((el) => ({ + label: el.task_name, + value: el.user_task_id, + })); + setJobListItems(jobs); + setJobListFull(page_items); + if (jobs.length) { + const fromUrl = searchParams.get("task"); + const matched = fromUrl + ? jobs.find((j) => j.value === Number(fromUrl)) : null; - const initial = matchedFromUrl?.value ?? jobIds[0].value; - setFilterQuery((prev) => ({ ...prev, job: initial })); + setFilters((p) => ({ ...p, job: matched?.value ?? jobs[0].value })); } } catch (error) { console.error("Failed to load jobs", error); - } finally { - setLoading(false); } }; useEffect(() => { - getJobList(); + fetchJobs(); }, []); - - const deepLinkConsumed = useRef(false); - - /* ─── server-side filtering: refetch when filters change ─── */ useEffect(() => { - if (!filterQueries.job) return; - getRunHistoryList(filterQueries.job, 1, pageSize, { - status: filterQueries.status, - trigger: filterQueries.trigger, - scope: filterQueries.scope, - }); + if (!filters.job) return; + fetchHistory(filters.job, 1, pageSize, filters); + fetchStats(filters.job); }, [ - filterQueries.status, - filterQueries.trigger, - filterQueries.scope, - filterQueries.job, + filters.job, + filters.status, + filters.trigger, + filters.search, + pageSize, + fetchHistory, + fetchStats, + datePreset, + customDateRange, ]); - /* ─── auto-expand on fresh data load ─── */ + // Don't auto-expand — keep collapsed on load + useEffect(() => { + setExpandedRowKeys([]); + }, [jobHistoryData]); + + // Debounce search input — wait 400ms before updating filters useEffect(() => { - const ids = []; - if ( - !deepLinkConsumed.current && - searchParams.has("task") && - backUpData.length > 0 - ) { - ids.push(backUpData[0].id); - deepLinkConsumed.current = true; + const timer = setTimeout(() => { + setFilters((p) => ({ ...p, search: searchText })); + }, 400); + return () => clearTimeout(timer); + }, [searchText]); + + const handleFilterChange = (key, value) => { + if (key === "search") { + setSearchText(value || ""); + return; } - (backUpData || []) - .filter((r) => r.status === "FAILURE" && r.error_message) - .forEach((r) => { - if (!ids.includes(r.id)) ids.push(r.id); - }); - setExpandedRowKeys(ids); - }, [backUpData]); - - /* ─── handlers ─── */ - const handleJobChange = useCallback( - (value) => { - setFilterQuery({ status: "", job: value, trigger: "", scope: "" }); + setFilters((p) => ({ ...p, [key]: value || "" })); + if (key === "job") setSearchParams( (prev) => { const next = new URLSearchParams(prev); @@ -239,441 +357,1452 @@ const Runhistory = () => { }, { replace: true } ); - }, - [setSearchParams] - ); - - const handleTriggerChange = useCallback((value) => { - setFilterQuery((prev) => ({ ...prev, trigger: value || "" })); - }, []); - - const handleScopeChange = useCallback((value) => { - setFilterQuery((prev) => ({ ...prev, scope: value || "" })); - }, []); - - const handleStatusChange = useCallback((value) => { - setFilterQuery((prev) => ({ ...prev, status: value || "" })); - }, []); - - const handleRefresh = useCallback(() => { - if (filterQueries.job) { - getRunHistoryList(filterQueries.job, currentPage, pageSize, { - status: filterQueries.status, - trigger: filterQueries.trigger, - scope: filterQueries.scope, - }); + }; + const handleRefresh = () => { + if (filters.job) { + fetchHistory(filters.job, currentPage, pageSize, filters); + fetchStats(filters.job); } - }, [filterQueries, currentPage, pageSize, getRunHistoryList]); - - const handlePagination = useCallback( - (newPage, newPageSize) => { - if (currentPage !== newPage || pageSize !== newPageSize) { - setCurrentPage(newPage); - setPageSize(newPageSize); - getRunHistoryList(filterQueries.job, newPage, newPageSize, { - status: filterQueries.status, - trigger: filterQueries.trigger, - scope: filterQueries.scope, - }); - } - }, - [currentPage, pageSize, filterQueries, getRunHistoryList] - ); + }; + const handleRetry = async (taskId) => { + if (retryLoading) return; + setRetryLoading(true); + try { + await axios.post( + `/api/v1/visitran/${orgId}/project/_all/jobs/trigger-periodic-task/${taskId}`, + {}, + { + headers: { + "X-CSRFToken": document.cookie.match(/csrftoken=([^;]+)/)?.[1], + }, + } + ); + notify({ type: "success", message: "Job triggered successfully" }); + setTimeout(handleRefresh, 2000); + } catch (error) { + notify({ error }); + } finally { + setRetryLoading(false); + } + }; + const handleCopyError = (text) => { + navigator.clipboard.writeText(text); + notify({ type: "success", message: "Copied to clipboard" }); + }; - const handleExpand = useCallback((expanded, record) => { - setExpandedRowKeys((prev) => - expanded ? [...prev, record.id] : prev.filter((k) => k !== record.id) - ); - }, []); + const activeFilterCount = [ + filters.status, + filters.trigger, + filters.search, + datePreset !== "24h" ? datePreset : null, + ].filter(Boolean).length; - /* ─── table columns ─── */ + /* ── Table columns ── */ const columns = useMemo( () => [ + { + title: "Run", + dataIndex: "run_number", + key: "run_number", + width: 70, + render: (n) => ( + + #{n || "—"} + + ), + }, { title: "Status", dataIndex: "status", key: "status", - width: 120, - render: (status) => ( - - {status === "FAILURE" ? "FAILED" : status} - - ), + width: 110, + render: (s) => { + if (s === "FAILURE") + return ( + } color="error"> + Failed + + ); + if (s === "SUCCESS") + return ( + } color="success"> + Success + + ); + if (s === "STARTED" || s === "RUNNING") + return ( + } color="processing"> + Running + + ); + return {s}; + }, }, { title: "Trigger", key: "trigger", - width: 120, - render: (_, record) => { - const { trigger } = getRunTriggerScope(record); - return trigger === "manual" ? ( - Manual - ) : ( - Scheduled + width: 170, + render: (_, r) => { + const trigger = r.trigger || r.kwargs?.trigger || "scheduled"; + const user = r.triggered_by; + const initials = user?.username + ? user.username.slice(0, 2).toUpperCase() + : trigger === "manual" + ? "M" + : "S"; + return ( + + + {initials} + + + {trigger === "manual" ? "Manual" : "Schedule"} + {user ? ` · ${user.username}` : ""} + + ); }, }, { title: "Scope", key: "scope", - width: 220, - render: (_, record) => { - const { scope, models } = getRunTriggerScope(record); - if (scope === "model") { - return ( - - Single model - {models.length > 0 && ( - - {models.join(", ")} - - )} - - ); - } - return Full job; + width: 150, + render: (_, r) => { + const scope = r.scope || "job"; + const count = + (r.result?.models || []).filter((m) => m.type !== "ephemeral") + .length || + r.model_count || + 0; + const models = r.kwargs?.models_override || []; + return ( + + + {scope === "model" ? models.join(", ") : "Full job"} + + + {count} models + + + ); + }, + }, + { + title: "Changes", + key: "changes", + width: 200, + render: (_, r) => { + if (r.status !== "SUCCESS" || !r.result) + return ; + const nonEph = (r.result?.models || []).filter( + (m) => m.type !== "ephemeral" + ); + const added = nonEph.some((m) => m.rows_inserted != null) + ? nonEph.reduce((s, m) => s + (m.rows_inserted || 0), 0) + : null; + const modified = nonEph.some((m) => m.rows_updated != null) + ? nonEph.reduce((s, m) => s + (m.rows_updated || 0), 0) + : null; + const deleted = nonEph.some((m) => m.rows_deleted != null) + ? nonEph.reduce((s, m) => s + (m.rows_deleted || 0), 0) + : null; + if (added === null && modified === null && deleted === null) + return ; + return ( + + {added !== null && ( + + + +{added.toLocaleString()} + + )} + {modified !== null && ( + + ✎ ~{modified.toLocaleString()} + + )} + {deleted !== null && ( + + ⊟ −{deleted.toLocaleString()} + + )} + + ); }, }, { title: "Triggered", dataIndex: "start_time", key: "start_time", - sorter: (a, b) => { - if (!a.start_time) return -1; - if (!b.start_time) return 1; - return new Date(a.start_time) - new Date(b.start_time); - }, + sorter: (a, b) => + new Date(a.start_time || 0) - new Date(b.start_time || 0), defaultSortOrder: "descend", - render: (text) => { - if (!text) { - return ( - - Not started yet - - ); - } - return ( - - - {formatDateTime(text)} - - {getRelativeTime(text)} - - - - ); - }, + render: (t) => + t ? ( + + + {formatDateTime(t)} + + + {getRelativeTime(t)} + + + ) : ( + Not started + ), }, { title: "Duration", - dataIndex: "duration", key: "duration", - width: 140, - sorter: (a, b) => { - if (!a.duration) return -1; - if (!b.duration) return 1; - return parseDurationMs(a.duration) - parseDurationMs(b.duration); + width: 130, + sorter: (a, b) => (a.duration_ms || 0) - (b.duration_ms || 0), + render: (_, r) => { + const ms = r.duration_ms || parseDurationMs(r.duration); + const isFail = r.status === "FAILURE"; + const pct = stats?.expected_duration_ms + ? Math.min((ms / stats.expected_duration_ms) * 100, 100) + : isFail + ? 70 + : 35; + return ( +
+ + {r.duration || formatDurationMs(ms)} + +
+
+
+
+ ); }, - render: (duration) => ( - {formatDuration(duration)} + }, + { + title: "", + key: "actions", + width: 50, + render: () => ( + +
+ return ( + + {/* Header */} + + + + Run #{run.run_number} details + + {isFailure + ? `failed after ${passed} of ${total} models` + : `${total} models built successfully`} + {dur !== "—" && ` · ${dur} total runtime`} + + + + + + {isSuccess && envInfo.project_id && ( + + )} + + + + - {/* ─── Job Info Banner ─── */} - {filterQueries.job && envInfo.job_name && ( -
- - {envInfo.job_name} #{envInfo.id} - - - {envInfo.env_type && ( - }> - {envInfo.env_type} - - )} - {jobSchedule[envInfo.id] && ( - - }> - SCHEDULED - - + {/* Success banner */} + {isSuccess && ( +
+ + +
+ + All {total} models built successfully + +
+ + {totalRowsProcessed != null + ? `${totalRowsProcessed.toLocaleString()} rows processed · ` + : ""} + {dur} total runtime + +
+
+ {(totalAdded != null || + totalModified != null || + totalDeleted != null) && ( + + {totalAdded != null && ( + + + +{totalAdded.toLocaleString()} + + )} + {totalModified != null && ( + + ✎ ~{totalModified.toLocaleString()} + + )} + {totalDeleted != null && ( + + ⊟ −{totalDeleted.toLocaleString()} + + )} + )} - -
- )} +
+ )} - {/* ─── History Table ─── */} -
- - expandedRowKeys.includes(record.id) ? "runhistory-row-expanded" : "" - } - onRow={(record) => - expandedRowKeys.includes(record.id) - ? { - style: { - boxShadow: `inset 3px 0 0 0 ${token.colorError}`, - }, - } - : {} - } - expandable={{ - expandedRowRender: (record) => { - const meta = STATUS_META[record.status] || STATUS_META.PENDING; - const { scope, models } = getRunTriggerScope(record); - const isFailure = record.status === "FAILURE"; - return ( -
+ {/* Stats grid — different for success vs failure */} + {isSuccess ? ( + <> + {/* Row stats cards for success */} + +
+
- {meta.icon} - - {meta.label} - - {record.start_time && ( - - - · {formatDateTime(record.start_time)} ( - {getRelativeTime(record.start_time)}) - - - )} - {record.duration && ( - - · {formatDuration(record.duration)} - - )} + ROWS PROCESSED +
+
+ {totalRowsProcessed != null + ? totalRowsProcessed.toLocaleString() + : "—"}
- + +
+ +
- - {scope === "model" ? "Single model" : "Full job"} - - - {models.length > 0 - ? `Models attempted: ${models.join(", ")}` - : "No model configuration recorded for this run."} - + + ADDED +
+
+ {totalAdded != null + ? `+${totalAdded.toLocaleString()}` + : "—"} +
+
+ + + +
+ ✎ MODIFIED +
+
+ {totalModified != null + ? `~${totalModified.toLocaleString()}` + : "—"} +
+
+ + + +
+ ⊟ DELETED +
+
+ {totalDeleted != null + ? `−${totalDeleted.toLocaleString()}` + : "—"} +
+
+ + + + {/* Per-model changes table */} + {models.length > 0 && ( + <> + + Per-model changes + +
m.type !== "ephemeral") + .map((m, i) => ({ ...m, key: i }))} + columns={modelColumns} + pagination={false} + showHeader + /> + + )} + + ) : ( + <> + {/* Failure stats grid */} + + + + +
+ {passed} +
+
+
+ Succeeded +
+ + of {total} total + +
- {record.result?.total > 0 && ( +
+ + + +
0 ? token.colorError : undefined, }} > - - {record.result.total || 0} models - attempted - - - {record.result.passed || 0} passed - - - {record.result.failed || 0} failed - - {record.result.models?.length > 0 && ( - - {record.result.models - .map((m) => `${m.name} (${m.end_status})`) - .join(", ")} - + {failed} +
+
+
+ Failed +
+ {failedModels.length > 0 && ( + + {failedModels.join(", ")} + + )} +
+
+
+ + + + +
+ {skipped} +
+
+
+ Skipped +
+ {skipped > 0 && ( + + downstream of fail + + )} +
+
+
+ + + + +
{dur}
+
+
+ Runtime +
+ {expected && ( + + expected {expected} + )}
+
+
+ + + + {/* Error box */} + {run.error_message && ( +
+
+ + + Error in {errorModelName || "execution"} + + + + + +
+
+ {errorModelName && ( + <> + + {errorModelName} + + {" · "} + )} - {isFailure && record.error_message && ( - + {errorStack && ( +
{errorStack}
+ )} +
+ )} + + {/* Execution Timeline for failures */} + + Execution timeline + + + ), + children: ( + +
+ Setup + + + - {record.error_message} - - } + — + + + + ), + }, + ...models + .filter((m) => m.type !== "ephemeral") + .map((m, i) => { + const isOk = + m.end_status === "OK" || m.status === "success"; + const isFail = + m.end_status === "FAIL" || m.status === "failure"; + return { + key: i, + dot: isOk ? ( + + ) : isFail ? ( + + ) : ( + + ), + color: isFail ? "red" : isOk ? "green" : "gray", + children: ( + + + + {m.name} + + + + + {m.duration_ms + ? formatDurationMs(m.duration_ms) + : "—"} + + + + ), + }; + }), + ...(skipped > 0 + ? [ + { + dot: , + color: "gray", + children: ( + + + + {skipped} downstream models + + + + + skipped + + + + ), + }, + ] + : []), + ]} + /> + + )} + + ); + }; + + /* ═══════════════ RENDER ═══════════════ */ + return ( +
+ {/* Header */} +
+ + Run History + + Pick any job below to see its runs. +
+ + {/* Job Switcher */} +
+
+ +
+ + Viewing runs for + + + + } + value={searchText} + onChange={(e) => setSearchText(e.target.value || "")} + allowClear + /> + + + handleFilterChange("status", v)} + options={STATUS_OPTIONS} + /> + + + + + + + {activeFilterCount > 0 && ( + <> + 1 ? "s" : "" + }`} + style={{ backgroundColor: token.colorPrimary }} /> - )} - - ); - }, - rowExpandable: (record) => + + + )} + +
setExpandedRowKeys([...keys]), + expandedRowRender: (record) => , + expandRowByClick: false, + expandIcon: ({ expanded, onExpand, record }) => ["SUCCESS", "FAILURE", "RETRY", "REVOKED"].includes( record.status - ), - expandedRowKeys, - onExpand: handleExpand, + ) ? ( +