From eab6cdbf76e4cbe61bf29497d622c11c5b3d1aa8 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 8 Apr 2026 21:44:41 +0000 Subject: [PATCH 01/11] feat: Implement execution history --- packages/bigframes/bigframes/__init__.py | 2 + .../bigframes/core/global_session.py | 9 + .../bigframes/bigframes/session/__init__.py | 71 +++++- .../bigframes/bigframes/session/loader.py | 17 +- .../bigframes/bigframes/session/metrics.py | 210 ++++++++++++++++-- .../system/small/test_polars_execution.py | 23 ++ .../tests/unit/session/test_metrics.py | 18 ++ packages/bigframes/tests/unit/test_pandas.py | 2 + 8 files changed, 314 insertions(+), 38 deletions(-) diff --git a/packages/bigframes/bigframes/__init__.py b/packages/bigframes/bigframes/__init__.py index 29a27e4b6f90..7061300b5cc5 100644 --- a/packages/bigframes/bigframes/__init__.py +++ b/packages/bigframes/bigframes/__init__.py @@ -45,6 +45,7 @@ from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402 from bigframes.core.global_session import ( # noqa: E402 close_session, + execution_history, get_global_session, ) from bigframes.session import Session, connect # noqa: E402 @@ -69,6 +70,7 @@ def load_ipython_extension(ipython): "BigQueryOptions", "get_global_session", "close_session", + "execution_history", "enums", "exceptions", "connect", diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index ce3b16d041e3..f12223837fa7 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -26,6 +26,7 @@ import bigframes.exceptions as bfe if TYPE_CHECKING: + import pandas import bigframes.session _global_session: Optional[bigframes.session.Session] = None @@ -124,6 +125,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T: return func_(get_global_session(), *args, **kwargs) +def execution_history() -> "pandas.DataFrame": + import pandas # noqa: F401 + + import bigframes.session + + return with_default_session(bigframes.session.Session.execution_history) + + class _GlobalSessionContext: """ Context manager for testing that sets global session. diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index a6bb3041764c..99af2480fa0e 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -109,6 +109,39 @@ logger = logging.getLogger(__name__) +class _ExecutionHistory(pandas.DataFrame): + @property + def _constructor(self): + return _ExecutionHistory + + def _repr_html_(self) -> str | None: + try: + import bigframes.formatting_helpers as formatter + + if self.empty: + return "
No executions found.
" + + cols = ["job_id", "status", "total_bytes_processed", "job_url"] + df_display = self[cols].copy() + df_display["total_bytes_processed"] = df_display[ + "total_bytes_processed" + ].apply(formatter.get_formatted_bytes) + + def format_url(url): + return f'Open Job' if url else "" + + df_display["job_url"] = df_display["job_url"].apply(format_url) + + # Rename job_id to query_id to match user expectations + df_display = df_display.rename(columns={"job_id": "query_id"}) + + compact_html = df_display.to_html(escape=False, index=False) + + return compact_html + except Exception: + return super()._repr_html_() # type: ignore + + @log_adapter.class_logger class Session( third_party_pandas_gbq.GBQIOMixin, @@ -233,6 +266,7 @@ def __init__( ) self._metrics = metrics.ExecutionMetrics() + self._publisher.subscribe(self._metrics.on_event) self._function_session = bff_session.FunctionSession() self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager( self._clients_provider.bqclient, @@ -371,6 +405,10 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._metrics.slot_millis + def execution_history(self) -> pandas.DataFrame: + """Returns a list of underlying BigQuery executions initiated by BigFrames in the current session.""" + return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs]) + @property def _allows_ambiguity(self) -> bool: return self._allow_ambiguity @@ -432,7 +470,8 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq( @@ -448,7 +487,8 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq( self, @@ -520,7 +560,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def _read_gbq_colab( @@ -529,7 +570,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -590,7 +632,8 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_query( @@ -606,7 +649,8 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_query( self, @@ -753,7 +797,8 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_table( @@ -767,7 +812,8 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_table( self, @@ -918,7 +964,8 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: ... + ) -> bigframes.core.indexes.Index: + ... @typing.overload def read_pandas( @@ -926,7 +973,8 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: ... + ) -> bigframes.series.Series: + ... @typing.overload def read_pandas( @@ -934,7 +982,8 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... def read_pandas( self, diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index b0a9e0a1ed31..89fb3327fcda 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -49,6 +49,8 @@ import google.cloud.bigquery import google.cloud.bigquery as bigquery import google.cloud.bigquery.table +from google.cloud.bigquery.job.load import LoadJob +from google.cloud.bigquery.job.query import QueryJob import pandas import pyarrow as pa from google.cloud import bigquery_storage_v1 @@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob): else: job.result() + if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)): + self._metrics.count_job_stats(query_job=job) + @overload def read_gbq_table( # type: ignore[overload-overlap] self, @@ -626,7 +631,8 @@ def read_gbq_table( # type: ignore[overload-overlap] n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_table( @@ -649,7 +655,8 @@ def read_gbq_table( n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_table( self, @@ -1133,7 +1140,8 @@ def read_gbq_query( # type: ignore[overload-overlap] dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_query( @@ -1149,7 +1157,8 @@ def read_gbq_query( dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_query( self, diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index 8d43a83d7309..d21b47fcf49c 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -15,16 +15,131 @@ from __future__ import annotations import dataclasses +import datetime import os -from typing import Optional, Tuple +from typing import Any, Mapping, Optional, Tuple, Union import google.cloud.bigquery as bigquery -import google.cloud.bigquery.job as bq_job +from google.cloud.bigquery.job.load import LoadJob +from google.cloud.bigquery.job.query import QueryJob import google.cloud.bigquery.table as bq_table LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" +@dataclasses.dataclass +class JobMetadata: + job_id: Optional[str] = None + query_id: Optional[str] = None + location: Optional[str] = None + project: Optional[str] = None + creation_time: Optional[datetime.datetime] = None + start_time: Optional[datetime.datetime] = None + end_time: Optional[datetime.datetime] = None + duration_seconds: Optional[float] = None + status: Optional[str] = None + total_bytes_processed: Optional[int] = None + total_slot_ms: Optional[int] = None + job_type: Optional[str] = None + error_result: Optional[Mapping[str, Any]] = None + cached: Optional[bool] = None + job_url: Optional[str] = None + query: Optional[str] = None + destination_table: Optional[str] = None + source_uris: Optional[list[str]] = None + input_files: Optional[int] = None + input_bytes: Optional[int] = None + output_rows: Optional[int] = None + source_format: Optional[str] = None + + @classmethod + def from_job( + cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None + ) -> "JobMetadata": + query_text = getattr(query_job, "query", None) + if query_text and len(query_text) > 1024: + query_text = query_text[:1021] + "..." + + job_id = getattr(query_job, "job_id", None) + job_url = None + if job_id: + job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults" + + metadata = cls( + job_id=query_job.job_id, + location=query_job.location, + project=query_job.project, + creation_time=query_job.created, + start_time=query_job.started, + end_time=query_job.ended, + duration_seconds=exec_seconds, + status=query_job.state, + job_type=query_job.job_type, + error_result=query_job.error_result, + query=query_text, + job_url=job_url, + ) + if isinstance(query_job, QueryJob): + metadata.cached = getattr(query_job, "cache_hit", None) + metadata.destination_table = ( + str(query_job.destination) if query_job.destination else None + ) + metadata.total_bytes_processed = getattr( + query_job, "total_bytes_processed", None + ) + metadata.total_slot_ms = getattr(query_job, "slot_millis", None) + elif isinstance(query_job, LoadJob): + metadata.output_rows = getattr(query_job, "output_rows", None) + metadata.input_files = getattr(query_job, "input_files", None) + metadata.input_bytes = getattr(query_job, "input_bytes", None) + metadata.destination_table = ( + str(query_job.destination) + if getattr(query_job, "destination", None) + else None + ) + if getattr(query_job, "source_uris", None): + metadata.source_uris = list(query_job.source_uris) + if query_job.configuration and hasattr( + query_job.configuration, "source_format" + ): + metadata.source_format = query_job.configuration.source_format + + return metadata + + @classmethod + def from_row_iterator( + cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None + ) -> "JobMetadata": + query_text = getattr(row_iterator, "query", None) + if query_text and len(query_text) > 1024: + query_text = query_text[:1021] + "..." + + job_id = getattr(row_iterator, "job_id", None) + job_url = None + if job_id: + project = getattr(row_iterator, "project", "") + location = getattr(row_iterator, "location", "") + job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults" + + return cls( + job_id=job_id, + query_id=getattr(row_iterator, "query_id", None), + location=getattr(row_iterator, "location", None), + project=getattr(row_iterator, "project", None), + creation_time=getattr(row_iterator, "created", None), + start_time=getattr(row_iterator, "started", None), + end_time=getattr(row_iterator, "ended", None), + duration_seconds=exec_seconds, + status="DONE", + total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None), + total_slot_ms=getattr(row_iterator, "slot_millis", None), + job_type="query", + cached=getattr(row_iterator, "cache_hit", None), + query=query_text, + job_url=job_url, + ) + + @dataclasses.dataclass class ExecutionMetrics: execution_count: int = 0 @@ -32,10 +147,11 @@ class ExecutionMetrics: bytes_processed: int = 0 execution_secs: float = 0 query_char_count: int = 0 + jobs: list[JobMetadata] = dataclasses.field(default_factory=list) def count_job_stats( self, - query_job: Optional[bq_job.QueryJob] = None, + query_job: Optional[Union[QueryJob, LoadJob]] = None, row_iterator: Optional[bq_table.RowIterator] = None, ): if query_job is None: @@ -57,21 +173,64 @@ def count_job_stats( self.slot_millis += slot_millis self.execution_secs += exec_seconds - elif query_job.configuration.dry_run: - query_char_count = len(query_job.query) + self.jobs.append( + JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds) + ) + + elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run: + query_char_count = len(getattr(query_job, "query", "")) # TODO(tswast): Pass None after making benchmark publishing robust to missing data. bytes_processed = 0 slot_millis = 0 exec_seconds = 0.0 - elif (stats := get_performance_stats(query_job)) is not None: - query_char_count, bytes_processed, slot_millis, exec_seconds = stats + elif isinstance(query_job, bigquery.QueryJob): + if (stats := get_performance_stats(query_job)) is not None: + query_char_count, bytes_processed, slot_millis, exec_seconds = stats + self.execution_count += 1 + self.query_char_count += query_char_count or 0 + self.bytes_processed += bytes_processed or 0 + self.slot_millis += slot_millis or 0 + self.execution_secs += exec_seconds or 0 + + metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds) + metadata.total_bytes_processed = bytes_processed + metadata.total_slot_ms = slot_millis + self.jobs.append(metadata) + + else: self.execution_count += 1 - self.query_char_count += query_char_count or 0 - self.bytes_processed += bytes_processed or 0 - self.slot_millis += slot_millis or 0 - self.execution_secs += exec_seconds or 0 + duration = ( + (query_job.ended - query_job.created).total_seconds() + if query_job.ended and query_job.created + else None + ) + self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration)) + + # For pytest runs only, log information about the query job + # to a file in order to create a performance report. + if ( + isinstance(query_job, bigquery.QueryJob) + and not query_job.configuration.dry_run + ): + stats = get_performance_stats(query_job) + if stats: + write_stats_to_disk( + query_char_count=stats[0], + bytes_processed=stats[1], + slot_millis=stats[2], + exec_seconds=stats[3], + ) + elif row_iterator is not None: + bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 + query_char_count = len(getattr(row_iterator, "query", "") or "") + slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 + created = getattr(row_iterator, "created", None) + ended = getattr(row_iterator, "ended", None) + exec_seconds = ( + (ended - created).total_seconds() if created and ended else 0.0 + ) write_stats_to_disk( query_char_count=query_char_count, bytes_processed=bytes_processed, @@ -79,19 +238,24 @@ def count_job_stats( exec_seconds=exec_seconds, ) - else: - # TODO(tswast): Pass None after making benchmark publishing robust to missing data. - bytes_processed = 0 - query_char_count = 0 - slot_millis = 0 - exec_seconds = 0 + def on_event(self, event: Any): + try: + import bigframes.core.events + from bigframes.session.executor import LocalExecuteResult + except ImportError: + return - write_stats_to_disk( - query_char_count=query_char_count, - bytes_processed=bytes_processed, - slot_millis=slot_millis, - exec_seconds=exec_seconds, - ) + if isinstance(event, bigframes.core.events.ExecutionFinished): + if event.result and isinstance(event.result, LocalExecuteResult): + self.execution_count += 1 + bytes_processed = event.result.total_bytes_processed or 0 + + metadata = JobMetadata( + job_type="polars", + status="DONE", + total_bytes_processed=bytes_processed, + ) + self.jobs.append(metadata) def get_performance_stats( diff --git a/packages/bigframes/tests/system/small/test_polars_execution.py b/packages/bigframes/tests/system/small/test_polars_execution.py index 1b58dc9d12b1..0b986becd3e6 100644 --- a/packages/bigframes/tests/system/small/test_polars_execution.py +++ b/packages/bigframes/tests/system/small/test_polars_execution.py @@ -72,3 +72,26 @@ def test_polar_execution_unsupported_sql_fallback( # geo fns not supported by polar engine yet, so falls back to bq execution assert session_w_polars._metrics.execution_count == (execution_count_before + 1) assert math.isclose(bf_result.geo_area.sum(), 70.52332050, rel_tol=0.00001) + + +def test_polars_execution_history(session_w_polars): + import pandas as pd + + # Create a small local DataFrame + pdf = pd.DataFrame({"col_a": [1, 2, 3], "col_b": ["x", "y", "z"]}) + + # Read simple local data + df = session_w_polars.read_pandas(pdf) + + # Trigger execution + _ = df.to_pandas() + + # Verify the execution history captured the local job + history = session_w_polars.execution_history() + + # Verify we have at least one job and logged as polars + assert len(history) > 0 + last_job = history.iloc[-1] + + assert last_job["job_type"] == "polars" + assert last_job["status"] == "DONE" diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py index 7c2f01c5b98f..296c6e96c5af 100644 --- a/packages/bigframes/tests/unit/session/test_metrics.py +++ b/packages/bigframes/tests/unit/session/test_metrics.py @@ -245,3 +245,21 @@ def test_write_stats_to_disk_no_env_var(tmp_path, monkeypatch): exec_seconds=1.23, ) assert len(list(tmp_path.iterdir())) == 0 + + +def test_on_event_with_local_execute_result(): + import bigframes.core.events + from bigframes.session.executor import LocalExecuteResult + + local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True) + local_result.total_bytes_processed = 1024 + + event = bigframes.core.events.ExecutionFinished(result=local_result) + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.on_event(event) + + assert execution_metrics.execution_count == 1 + assert len(execution_metrics.jobs) == 1 + assert execution_metrics.jobs[0].job_type == "polars" + assert execution_metrics.jobs[0].status == "DONE" + assert execution_metrics.jobs[0].total_bytes_processed == 1024 diff --git a/packages/bigframes/tests/unit/test_pandas.py b/packages/bigframes/tests/unit/test_pandas.py index e1e713697db1..a79d7a059bb9 100644 --- a/packages/bigframes/tests/unit/test_pandas.py +++ b/packages/bigframes/tests/unit/test_pandas.py @@ -37,6 +37,8 @@ def all_session_methods(): session_attributes.remove("close") # streaming isn't in pandas session_attributes.remove("read_gbq_table_streaming") + # execution_history is in base namespace, not pandas + session_attributes.remove("execution_history") for attribute in sorted(session_attributes): session_method = getattr(bigframes.session.Session, attribute) From e5ec3e99048a4c74f78bba489611a14c915978f7 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 8 Apr 2026 22:06:11 +0000 Subject: [PATCH 02/11] style: Fix formatting and imports --- .../bigframes/core/global_session.py | 1 + .../bigframes/bigframes/session/__init__.py | 33 +++++++------------ .../bigframes/bigframes/session/loader.py | 16 ++++----- .../bigframes/bigframes/session/metrics.py | 2 +- packages/bigframes/tests/unit/test_col.py | 2 +- 5 files changed, 20 insertions(+), 34 deletions(-) diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index f12223837fa7..472255894534 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -27,6 +27,7 @@ if TYPE_CHECKING: import pandas + import bigframes.session _global_session: Optional[bigframes.session.Session] = None diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 99af2480fa0e..9ea5ded46b22 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -470,8 +470,7 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq( @@ -487,8 +486,7 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq( self, @@ -560,8 +558,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def _read_gbq_colab( @@ -570,8 +567,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -632,8 +628,7 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -649,8 +644,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, @@ -797,8 +791,7 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -812,8 +805,7 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -964,8 +956,7 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: - ... + ) -> bigframes.core.indexes.Index: ... @typing.overload def read_pandas( @@ -973,8 +964,7 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: - ... + ) -> bigframes.series.Series: ... @typing.overload def read_pandas( @@ -982,8 +972,7 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... def read_pandas( self, diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index 89fb3327fcda..c1c8f0eda409 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -49,11 +49,11 @@ import google.cloud.bigquery import google.cloud.bigquery as bigquery import google.cloud.bigquery.table -from google.cloud.bigquery.job.load import LoadJob -from google.cloud.bigquery.job.query import QueryJob import pandas import pyarrow as pa from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery.job.load import LoadJob +from google.cloud.bigquery.job.query import QueryJob from google.cloud.bigquery_storage_v1 import types as bq_storage_types import bigframes._tools @@ -631,8 +631,7 @@ def read_gbq_table( # type: ignore[overload-overlap] n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -655,8 +654,7 @@ def read_gbq_table( n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -1140,8 +1138,7 @@ def read_gbq_query( # type: ignore[overload-overlap] dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -1157,8 +1154,7 @@ def read_gbq_query( dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index d21b47fcf49c..cf34a620e099 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -20,9 +20,9 @@ from typing import Any, Mapping, Optional, Tuple, Union import google.cloud.bigquery as bigquery +import google.cloud.bigquery.table as bq_table from google.cloud.bigquery.job.load import LoadJob from google.cloud.bigquery.job.query import QueryJob -import google.cloud.bigquery.table as bq_table LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" diff --git a/packages/bigframes/tests/unit/test_col.py b/packages/bigframes/tests/unit/test_col.py index cf9aa5c4b86a..9f5bbca5d9bc 100644 --- a/packages/bigframes/tests/unit/test_col.py +++ b/packages/bigframes/tests/unit/test_col.py @@ -16,13 +16,13 @@ import pathlib from typing import Generator +import numpy as np import pandas as pd import pytest import bigframes import bigframes.pandas as bpd from bigframes.testing.utils import assert_frame_equal, convert_pandas_dtypes -import numpy as np pytest.importorskip("polars") pytest.importorskip("pandas", minversion="3.0.0") From 6055998d984c9a99265ff8d915fbd49f3e57c3b0 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 10 Apr 2026 19:55:50 +0000 Subject: [PATCH 03/11] refactor: Address review comments --- .../bigframes/bigframes/session/__init__.py | 55 +++++++++++-------- .../bigframes/bigframes/session/metrics.py | 3 +- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 9ea5ded46b22..1f82cb3ab1c6 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -115,29 +115,27 @@ def _constructor(self): return _ExecutionHistory def _repr_html_(self) -> str | None: - try: - import bigframes.formatting_helpers as formatter + import bigframes.formatting_helpers as formatter + + if self.empty: + return "
No executions found.
" + + cols = ["job_id", "status", "total_bytes_processed", "job_url"] - if self.empty: - return "
No executions found.
" + def format_url(url): + return f'Open Job' if url else "" - cols = ["job_id", "status", "total_bytes_processed", "job_url"] + try: df_display = self[cols].copy() df_display["total_bytes_processed"] = df_display[ "total_bytes_processed" ].apply(formatter.get_formatted_bytes) - - def format_url(url): - return f'Open Job' if url else "" - df_display["job_url"] = df_display["job_url"].apply(format_url) # Rename job_id to query_id to match user expectations df_display = df_display.rename(columns={"job_id": "query_id"}) - compact_html = df_display.to_html(escape=False, index=False) - - return compact_html + return df_display.to_html(escape=False, index=False) except Exception: return super()._repr_html_() # type: ignore @@ -470,7 +468,8 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq( @@ -486,7 +485,8 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq( self, @@ -558,7 +558,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def _read_gbq_colab( @@ -567,7 +568,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -628,7 +630,8 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_query( @@ -644,7 +647,8 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_query( self, @@ -791,7 +795,8 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_table( @@ -805,7 +810,8 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_table( self, @@ -956,7 +962,8 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: ... + ) -> bigframes.core.indexes.Index: + ... @typing.overload def read_pandas( @@ -964,7 +971,8 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: ... + ) -> bigframes.series.Series: + ... @typing.overload def read_pandas( @@ -972,7 +980,8 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... def read_pandas( self, diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index cf34a620e099..d2682bbcaf7f 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -195,8 +195,6 @@ def count_job_stats( self.execution_secs += exec_seconds or 0 metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds) - metadata.total_bytes_processed = bytes_processed - metadata.total_slot_ms = slot_millis self.jobs.append(metadata) else: @@ -249,6 +247,7 @@ def on_event(self, event: Any): if event.result and isinstance(event.result, LocalExecuteResult): self.execution_count += 1 bytes_processed = event.result.total_bytes_processed or 0 + self.bytes_processed += bytes_processed metadata = JobMetadata( job_type="polars", From 30f0a2bf6b4eab48ebe3f8a50b823c5698d9da3b Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 10 Apr 2026 20:25:10 +0000 Subject: [PATCH 04/11] refactor: Address review comments from Trevor --- .../bigframes/bigframes/session/__init__.py | 75 ++++++++++--------- .../system/small/test_polars_execution.py | 8 +- 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 1f82cb3ab1c6..7a3b4da83527 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -109,35 +109,44 @@ logger = logging.getLogger(__name__) -class _ExecutionHistory(pandas.DataFrame): - @property - def _constructor(self): - return _ExecutionHistory +class _ExecutionHistory: + def __init__(self, jobs: list[dict]): + self._df = pandas.DataFrame(jobs) + + def to_dataframe(self) -> pandas.DataFrame: + """Returns the execution history as a pandas DataFrame.""" + return self._df def _repr_html_(self) -> str | None: import bigframes.formatting_helpers as formatter - if self.empty: + if self._df.empty: return "
No executions found.
" - cols = ["job_id", "status", "total_bytes_processed", "job_url"] + cols = ["job_type", "job_id", "status", "total_bytes_processed", "job_url"] + + # Filter columns to only those that exist in the dataframe + available_cols = [c for c in cols if c in self._df.columns] def format_url(url): return f'Open Job' if url else "" try: - df_display = self[cols].copy() - df_display["total_bytes_processed"] = df_display[ - "total_bytes_processed" - ].apply(formatter.get_formatted_bytes) - df_display["job_url"] = df_display["job_url"].apply(format_url) + df_display = self._df[available_cols].copy() + if "total_bytes_processed" in df_display.columns: + df_display["total_bytes_processed"] = df_display[ + "total_bytes_processed" + ].apply(formatter.get_formatted_bytes) + if "job_url" in df_display.columns: + df_display["job_url"] = df_display["job_url"].apply(format_url) # Rename job_id to query_id to match user expectations - df_display = df_display.rename(columns={"job_id": "query_id"}) + if "job_id" in df_display.columns: + df_display = df_display.rename(columns={"job_id": "query_id"}) return df_display.to_html(escape=False, index=False) except Exception: - return super()._repr_html_() # type: ignore + return self._df._repr_html_() @log_adapter.class_logger @@ -403,8 +412,11 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._metrics.slot_millis - def execution_history(self) -> pandas.DataFrame: - """Returns a list of underlying BigQuery executions initiated by BigFrames in the current session.""" + def execution_history(self) -> _ExecutionHistory: + """Returns the history of executions initiated by BigFrames in the current session. + + Use `.to_dataframe()` on the result to get a pandas DataFrame. + """ return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs]) @property @@ -468,8 +480,7 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq( @@ -485,8 +496,7 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq( self, @@ -558,8 +568,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def _read_gbq_colab( @@ -568,8 +577,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -630,8 +638,7 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -647,8 +654,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, @@ -795,8 +801,7 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -810,8 +815,7 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -962,8 +966,7 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: - ... + ) -> bigframes.core.indexes.Index: ... @typing.overload def read_pandas( @@ -971,8 +974,7 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: - ... + ) -> bigframes.series.Series: ... @typing.overload def read_pandas( @@ -980,8 +982,7 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... def read_pandas( self, diff --git a/packages/bigframes/tests/system/small/test_polars_execution.py b/packages/bigframes/tests/system/small/test_polars_execution.py index 0b986becd3e6..fad8d9dba2f1 100644 --- a/packages/bigframes/tests/system/small/test_polars_execution.py +++ b/packages/bigframes/tests/system/small/test_polars_execution.py @@ -39,7 +39,7 @@ def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index): ] bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas() - assert session_w_polars._metrics.execution_count == execution_count_before + assert session_w_polars._metrics.execution_count == execution_count_before + 1 assert_frame_equal(bf_result, pd_result) @@ -56,7 +56,7 @@ def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_ind .to_pandas() ) - assert session_w_polars._metrics.execution_count == execution_count_before + assert session_w_polars._metrics.execution_count == execution_count_before + 1 assert_frame_equal(bf_result, pd_result) @@ -70,7 +70,7 @@ def test_polar_execution_unsupported_sql_fallback( bf_result = bf_df.to_pandas() # geo fns not supported by polar engine yet, so falls back to bq execution - assert session_w_polars._metrics.execution_count == (execution_count_before + 1) + assert session_w_polars._metrics.execution_count == (execution_count_before + 2) assert math.isclose(bf_result.geo_area.sum(), 70.52332050, rel_tol=0.00001) @@ -87,7 +87,7 @@ def test_polars_execution_history(session_w_polars): _ = df.to_pandas() # Verify the execution history captured the local job - history = session_w_polars.execution_history() + history = session_w_polars.execution_history().to_dataframe() # Verify we have at least one job and logged as polars assert len(history) > 0 From a46ce69a9e5e2c9ea2943dfde3bab03740891960 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 14:23:29 -0700 Subject: [PATCH 05/11] Update packages/bigframes/bigframes/session/metrics.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packages/bigframes/bigframes/session/metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index d2682bbcaf7f..d3c5ae998aba 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -248,6 +248,7 @@ def on_event(self, event: Any): self.execution_count += 1 bytes_processed = event.result.total_bytes_processed or 0 self.bytes_processed += bytes_processed + self.bytes_processed += bytes_processed metadata = JobMetadata( job_type="polars", From c09b9468d34d45f5c0165b6cb92092cc235d4b52 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 21:24:49 +0000 Subject: [PATCH 06/11] Revert "Update packages/bigframes/bigframes/session/metrics.py" This reverts commit a46ce69a9e5e2c9ea2943dfde3bab03740891960. --- packages/bigframes/bigframes/session/metrics.py | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index d3c5ae998aba..d2682bbcaf7f 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -248,7 +248,6 @@ def on_event(self, event: Any): self.execution_count += 1 bytes_processed = event.result.total_bytes_processed or 0 self.bytes_processed += bytes_processed - self.bytes_processed += bytes_processed metadata = JobMetadata( job_type="polars", From 60a19aeb46c18f55a658d26b5b8eb50cc7a25920 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 21:35:52 +0000 Subject: [PATCH 07/11] test: isolate vscode user agent test --- .../bigframes/tests/unit/session/test_clients.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/packages/bigframes/tests/unit/session/test_clients.py b/packages/bigframes/tests/unit/session/test_clients.py index 0a9b3edffefa..0de6c75e01b0 100644 --- a/packages/bigframes/tests/unit/session/test_clients.py +++ b/packages/bigframes/tests/unit/session/test_clients.py @@ -182,12 +182,18 @@ def test_user_agent_not_in_vscode(monkeypatch): @mock.patch.dict(os.environ, {"VSCODE_PID": "12345"}, clear=True) def test_user_agent_in_vscode(monkeypatch): monkeypatch_client_constructors(monkeypatch) - provider = create_clients_provider() - assert_clients_w_user_agent(provider, "vscode") - assert_clients_wo_user_agent(provider, "googlecloudtools.cloudcode") - # We still need to include attribution to bigframes - assert_clients_w_user_agent(provider, f"bigframes/{bigframes.version.__version__}") + with tempfile.TemporaryDirectory() as tmpdir: + user_home = pathlib.Path(tmpdir) + with mock.patch("pathlib.Path.home", return_value=user_home): + provider = create_clients_provider() + assert_clients_w_user_agent(provider, "vscode") + assert_clients_wo_user_agent(provider, "googlecloudtools.cloudcode") + + # We still need to include attribution to bigframes + assert_clients_w_user_agent( + provider, f"bigframes/{bigframes.version.__version__}" + ) @mock.patch.dict(os.environ, {"VSCODE_PID": "12345"}, clear=True) From d4283706dfc6ecf34f1418f8c11ceeb330cc021b Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 21:58:30 +0000 Subject: [PATCH 08/11] chore: format with ruff --- .../bigframes/bigframes/session/__init__.py | 33 +++++++------------ .../bigframes/bigframes/session/loader.py | 12 +++---- 2 files changed, 15 insertions(+), 30 deletions(-) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 36bfa26f1507..afa3b63e4d43 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -480,8 +480,7 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq( @@ -497,8 +496,7 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq( self, @@ -570,8 +568,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def _read_gbq_colab( @@ -580,8 +577,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -642,8 +638,7 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -659,8 +654,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, @@ -807,8 +801,7 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -822,8 +815,7 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -974,8 +966,7 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: - ... + ) -> bigframes.core.indexes.Index: ... @typing.overload def read_pandas( @@ -983,8 +974,7 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: - ... + ) -> bigframes.series.Series: ... @typing.overload def read_pandas( @@ -992,8 +982,7 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... def read_pandas( self, diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index 11fdf76dec26..d3be3d09f235 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -647,8 +647,7 @@ def read_gbq_table( # type: ignore[overload-overlap] n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -671,8 +670,7 @@ def read_gbq_table( n_rows: Optional[int] = None, index_col_in_columns: bool = False, publish_execution: bool = True, - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -1156,8 +1154,7 @@ def read_gbq_query( # type: ignore[overload-overlap] dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -1173,8 +1170,7 @@ def read_gbq_query( dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., allow_large_results: bool, - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, From 39f4c2a3dc5ef79e24dc678260250f8dcef12cbc Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 22:14:09 +0000 Subject: [PATCH 09/11] fix: mypy failures --- .../bigframes/core/global_session.py | 2 +- .../bigframes/bigframes/session/__init__.py | 35 ++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index 472255894534..9e9211fd0b1d 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -126,7 +126,7 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T: return func_(get_global_session(), *args, **kwargs) -def execution_history() -> "pandas.DataFrame": +def execution_history() -> "bigframes.session._ExecutionHistory": import pandas # noqa: F401 import bigframes.session diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index afa3b63e4d43..e1d44b71fdb1 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -146,7 +146,7 @@ def format_url(url): return df_display.to_html(escape=False, index=False) except Exception: - return self._df._repr_html_() + return self._df.to_html() @log_adapter.class_logger @@ -480,7 +480,8 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq( @@ -496,7 +497,8 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq( self, @@ -568,7 +570,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def _read_gbq_colab( @@ -577,7 +580,8 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -638,7 +642,8 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_query( @@ -654,7 +659,8 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_query( self, @@ -801,7 +807,8 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... @overload def read_gbq_table( @@ -815,7 +822,8 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: ... + ) -> pandas.Series: + ... def read_gbq_table( self, @@ -966,7 +974,8 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: ... + ) -> bigframes.core.indexes.Index: + ... @typing.overload def read_pandas( @@ -974,7 +983,8 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: ... + ) -> bigframes.series.Series: + ... @typing.overload def read_pandas( @@ -982,7 +992,8 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: ... + ) -> dataframe.DataFrame: + ... def read_pandas( self, From b66947337a91f5f5c11ffb4b460eb51455b3a0e7 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 15 Apr 2026 22:17:32 +0000 Subject: [PATCH 10/11] fix: lint and format --- .../bigframes/core/global_session.py | 2 -- .../bigframes/bigframes/core/sql/__init__.py | 3 +- .../bigframes/bigframes/session/__init__.py | 33 +++++++------------ 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index 9e9211fd0b1d..6ffb37ac5acf 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -26,8 +26,6 @@ import bigframes.exceptions as bfe if TYPE_CHECKING: - import pandas - import bigframes.session _global_session: Optional[bigframes.session.Session] = None diff --git a/packages/bigframes/bigframes/core/sql/__init__.py b/packages/bigframes/bigframes/core/sql/__init__.py index 051cb045223d..b28d59216950 100644 --- a/packages/bigframes/bigframes/core/sql/__init__.py +++ b/packages/bigframes/bigframes/core/sql/__init__.py @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations """ Utility functions for SQL construction. """ +from __future__ import annotations + import json from typing import ( TYPE_CHECKING, diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index e1d44b71fdb1..afca1ed0554f 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -480,8 +480,7 @@ def read_gbq( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq( @@ -497,8 +496,7 @@ def read_gbq( col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq( self, @@ -570,8 +568,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def _read_gbq_colab( @@ -580,8 +577,7 @@ def _read_gbq_colab( *, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... @log_adapter.log_name_override("read_gbq_colab") def _read_gbq_colab( @@ -642,8 +638,7 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., allow_large_results: Optional[bool] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_query( @@ -659,8 +654,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., allow_large_results: Optional[bool] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_query( self, @@ -807,8 +801,7 @@ def read_gbq_table( # type: ignore[overload-overlap] use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... @overload def read_gbq_table( @@ -822,8 +815,7 @@ def read_gbq_table( use_cache: bool = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., - ) -> pandas.Series: - ... + ) -> pandas.Series: ... def read_gbq_table( self, @@ -974,8 +966,7 @@ def read_pandas( pandas_dataframe: pandas.Index, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.core.indexes.Index: - ... + ) -> bigframes.core.indexes.Index: ... @typing.overload def read_pandas( @@ -983,8 +974,7 @@ def read_pandas( pandas_dataframe: pandas.Series, *, write_engine: constants.WriteEngineType = "default", - ) -> bigframes.series.Series: - ... + ) -> bigframes.series.Series: ... @typing.overload def read_pandas( @@ -992,8 +982,7 @@ def read_pandas( pandas_dataframe: pandas.DataFrame, *, write_engine: constants.WriteEngineType = "default", - ) -> dataframe.DataFrame: - ... + ) -> dataframe.DataFrame: ... def read_pandas( self, From 2398a6768ae61efddf7720d25262edb9a3358987 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Thu, 16 Apr 2026 04:47:31 +0000 Subject: [PATCH 11/11] test: fix execution count assertions --- .../small/session/test_read_gbq_colab.py | 54 ++++++++++--- .../tests/system/small/test_dataframe.py | 80 +++++++++++++------ .../tests/system/small/test_series_io.py | 14 +++- 3 files changed, 112 insertions(+), 36 deletions(-) diff --git a/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py b/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py index 65f47fe4e31d..6ba9c760847a 100644 --- a/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py +++ b/packages/bigframes/tests/system/small/session/test_read_gbq_colab.py @@ -89,11 +89,20 @@ def test_read_gbq_colab_fresh_session_is_hybrid(): assert len(result) == 100 assert session._executor._enable_polars_execution is True # type: ignore - assert executions_after == executions_before_python == 1 + assert executions_before_python == 1 + assert executions_after == 2 + history = session.execution_history().to_dataframe() + assert history.iloc[-1]["job_type"] == "polars" def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session): - executions_before_sql = maybe_ordered_session._metrics.execution_count + history_before = maybe_ordered_session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + df = maybe_ordered_session._read_gbq_colab( """ SELECT @@ -107,20 +116,36 @@ def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session): LIMIT 300 """ ) - executions_before_python = maybe_ordered_session._metrics.execution_count + + history_after_read = maybe_ordered_session.execution_history().to_dataframe() + queries_after_read = len( + history_after_read[history_after_read["job_type"] == "query"] + ) + result = df.peek(100) - executions_after = maybe_ordered_session._metrics.execution_count + + history_after_peek = maybe_ordered_session.execution_history().to_dataframe() + queries_after_peek = len( + history_after_peek[history_after_peek["job_type"] == "query"] + ) # Ok, this isn't guaranteed by peek, but should happen with read api based impl # if starts failing, maybe stopped using read api? assert result["total"].is_monotonic_decreasing assert len(result) == 100 - assert executions_after == executions_before_python == executions_before_sql + 1 + assert queries_after_read == queries_before + 1 + assert queries_after_peek == queries_after_read def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session): - executions_before_sql = maybe_ordered_session._metrics.execution_count + history_before = maybe_ordered_session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + df = maybe_ordered_session._read_gbq_colab( """ SELECT @@ -134,10 +159,21 @@ def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session): LIMIT 300 """ ) - executions_before_python = maybe_ordered_session._metrics.execution_count + + history_after_read = maybe_ordered_session.execution_history().to_dataframe() + queries_after_read = len( + history_after_read[history_after_read["job_type"] == "query"] + ) + _ = repr(df) - executions_after = maybe_ordered_session._metrics.execution_count - assert executions_after == executions_before_python == executions_before_sql + 1 + + history_after_repr = maybe_ordered_session.execution_history().to_dataframe() + queries_after_repr = len( + history_after_repr[history_after_repr["job_type"] == "query"] + ) + + assert queries_after_read == queries_before + 1 + assert queries_after_repr == queries_after_read def test_read_gbq_colab_includes_formatted_scalars(session): diff --git a/packages/bigframes/tests/system/small/test_dataframe.py b/packages/bigframes/tests/system/small/test_dataframe.py index 8df13a5bcbda..ce18c6456767 100644 --- a/packages/bigframes/tests/system/small/test_dataframe.py +++ b/packages/bigframes/tests/system/small/test_dataframe.py @@ -945,41 +945,55 @@ def test_join_repr(scalars_dfs_maybe_ordered): def test_repr_w_display_options(scalars_dfs, session): - metrics = session._metrics scalars_df, _ = scalars_dfs # get a pandas df of the expected format df, _ = scalars_df._block.to_pandas() pandas_df = df.set_axis(scalars_df._block.column_labels, axis=1) pandas_df.index.name = scalars_df.index.name - executions_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + with bigframes.option_context( "display.max_rows", 10, "display.max_columns", 5, "display.max_colwidth", 10 ): # When there are 10 or fewer rows, the outputs should be identical except for the extra note. actual = scalars_df.head(10).__repr__() - executions_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) with display_options.pandas_repr(bigframes.options.display): pandas_repr = pandas_df.head(10).__repr__() assert actual == pandas_repr - assert (executions_post - executions_pre) <= 3 + assert (queries_post - queries_pre) <= 2 def test_mimebundle_html_repr_w_all_rows(scalars_dfs, session): - metrics = session._metrics scalars_df, _ = scalars_dfs # get a pandas df of the expected format df, _ = scalars_df._block.to_pandas() pandas_df = df.set_axis(scalars_df._block.column_labels, axis=1) pandas_df.index.name = scalars_df.index.name - executions_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + # When there are 10 or fewer rows, the outputs should be identical except for the extra note. bundle = scalars_df.head(10)._repr_mimebundle_() actual = bundle["text/html"] - executions_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) with display_options.pandas_repr(bigframes.options.display): pandas_repr = pandas_df.head(10)._repr_html_() @@ -989,7 +1003,7 @@ def test_mimebundle_html_repr_w_all_rows(scalars_dfs, session): + f"[{len(pandas_df.index)} rows x {len(pandas_df.columns)} columns in total]" ) assert actual == expected - assert (executions_post - executions_pre) <= 3 + assert (queries_post - queries_pre) <= 2 def test_df_column_name_with_space(scalars_dfs): @@ -3094,18 +3108,23 @@ def test_binop_with_self_aggregate(scalars_dfs_maybe_ordered): df_columns = ["int64_col", "float64_col", "int64_too"] - # Ensure that this takes the optimized single-query path by counting executions - execution_count_before = scalars_df._session._metrics.execution_count + history_before = scalars_df._session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + bf_df = scalars_df[df_columns] bf_result = (bf_df - bf_df.mean()).to_pandas() - execution_count_after = scalars_df._session._metrics.execution_count + + history_after = scalars_df._session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) pd_df = scalars_pandas_df[df_columns] pd_result = pd_df - pd_df.mean() - executions = execution_count_after - execution_count_before - - assert executions == 1 + assert (queries_after - queries_before) == 1 assert_frame_equal(bf_result, pd_result, check_dtype=False) @@ -3114,18 +3133,23 @@ def test_binop_with_self_aggregate_w_index_reset(scalars_dfs_maybe_ordered): df_columns = ["int64_col", "float64_col", "int64_too"] - # Ensure that this takes the optimized single-query path by counting executions - execution_count_before = scalars_df._session._metrics.execution_count + history_before = scalars_df._session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) + bf_df = scalars_df[df_columns].reset_index(drop=True) bf_result = (bf_df - bf_df.mean()).to_pandas() - execution_count_after = scalars_df._session._metrics.execution_count + + history_after = scalars_df._session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) pd_df = scalars_pandas_df[df_columns].reset_index(drop=True) pd_result = pd_df - pd_df.mean() - executions = execution_count_after - execution_count_before - - assert executions == 1 + assert (queries_after - queries_before) == 1 pd_result.index = pd_result.index.astype("Int64") assert_frame_equal(bf_result, pd_result, check_dtype=False, check_index_type=False) @@ -5948,16 +5972,22 @@ def test_dataframe_explode(col_names, ignore_index, session): "C": [["a", "b", "c"], np.nan, ["d", "e"]], } - metrics = session._metrics df = bpd.DataFrame(data, session=session) pd_df = df.to_pandas() pd_result = pd_df.explode(col_names, ignore_index=ignore_index) bf_result = df.explode(col_names, ignore_index=ignore_index) - # Check that to_pandas() results in at most a single query execution - execs_pre = metrics.execution_count + history_pre = session.execution_history().to_dataframe() + queries_pre = ( + len(history_pre[history_pre["job_type"] == "query"]) + if "job_type" in history_pre.columns + else 0 + ) + bf_materialized = bf_result.to_pandas() - execs_post = metrics.execution_count + + history_post = session.execution_history().to_dataframe() + queries_post = len(history_post[history_post["job_type"] == "query"]) bigframes.testing.utils.assert_frame_equal( bf_materialized, @@ -5967,7 +5997,7 @@ def test_dataframe_explode(col_names, ignore_index, session): ) # we test this property on this method in particular as compilation # is non-deterministic and won't use the query cache as implemented - assert execs_post - execs_pre <= 1 + assert (queries_post - queries_pre) <= 1 @pytest.mark.parametrize( diff --git a/packages/bigframes/tests/system/small/test_series_io.py b/packages/bigframes/tests/system/small/test_series_io.py index 2f1780812ae8..83c2de70cae6 100644 --- a/packages/bigframes/tests/system/small/test_series_io.py +++ b/packages/bigframes/tests/system/small/test_series_io.py @@ -30,13 +30,23 @@ def test_to_pandas_override_global_option(scalars_df_index): assert table_id is not None session = bf_series._block.session - execution_count = session._metrics.execution_count + + history_before = session.execution_history().to_dataframe() + queries_before = ( + len(history_before[history_before["job_type"] == "query"]) + if "job_type" in history_before.columns + else 0 + ) # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. bf_series.to_pandas(allow_large_results=False) assert bf_series._query_job.destination.table_id == table_id - assert session._metrics.execution_count - execution_count == 1 + + history_after = session.execution_history().to_dataframe() + queries_after = len(history_after[history_after["job_type"] == "query"]) + + assert (queries_after - queries_before) == 1 @pytest.mark.parametrize(