diff --git a/Cargo.lock b/Cargo.lock index 5c0ed65f..206bb42a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,6 +2134,7 @@ dependencies = [ "ethlambda-storage", "ethlambda-test-fixtures", "ethlambda-types", + "futures-core", "hex", "http-body-util", "jemalloc_pprof", @@ -2141,6 +2142,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tokio-util", "tower", "tracing", diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 914c6c86..689a21a2 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -210,6 +210,13 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); + // Chain-event broadcast channel: the blockchain actor is the sole sender; + // each SSE client (`GET /lean/v0/events`) subscribes its own receiver. The + // initial receiver is dropped — subscribers attach on demand and a fully + // unsubscribed channel just drops events. + let (chain_events, _) = + tokio::sync::broadcast::channel(ethlambda_blockchain::CHAIN_EVENT_CHANNEL_CAPACITY); + let blockchain = BlockChain::spawn( store.clone(), validator_keys, @@ -220,6 +227,7 @@ async fn main() -> eyre::Result<()> { enable_proposer_aggregation: options.enable_proposer_aggregation, max_attestations_per_block: options.max_attestations_per_block, }, + chain_events.clone(), ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the @@ -259,9 +267,15 @@ async fn main() -> eyre::Result<()> { let rpc_shutdown = shutdown_token.clone(); let rpc_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown) - .await - .inspect_err(|err| error!(%err, "RPC server failed")); + let _ = ethlambda_rpc::start_rpc_server( + rpc_config, + store, + aggregator, + chain_events, + rpc_shutdown, + ) + .await + .inspect_err(|err| error!(%err, "RPC server failed")); }); info!("Node initialized"); diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index f25234cd..a08262eb 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -29,12 +29,12 @@ tokio-util = { version = "0.7", default-features = false } rayon.workspace = true thiserror.workspace = true tracing.workspace = true +serde.workspace = true hex.workspace = true [dev-dependencies] ethlambda-test-fixtures.workspace = true -serde = { workspace = true } serde_json = { workspace = true } hex = { workspace = true } libssz.workspace = true diff --git a/crates/blockchain/src/events.rs b/crates/blockchain/src/events.rs new file mode 100644 index 00000000..0281c4d8 --- /dev/null +++ b/crates/blockchain/src/events.rs @@ -0,0 +1,56 @@ +//! Chain events emitted by the [`crate::BlockChainServer`] actor and streamed +//! to RPC clients over Server-Sent Events (`GET /lean/v0/events`). +//! +//! The flow is strictly one-directional: the actor (the sole writer) publishes +//! events on a [`broadcast`] channel, and the read-only RPC handler subscribes. +//! RPC never writes back into the actor. + +use ethlambda_types::primitives::H256; +use serde::Serialize; +use tokio::sync::broadcast; + +/// A consensus event broadcast to SSE subscribers. +/// +/// Serialized with an external `event`/`data` tag so the JSON payload mirrors +/// the SSE framing (`event: head\ndata: {...}`). +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "event", content = "data", rename_all = "snake_case")] +pub enum ChainEvent { + /// Fork choice selected a new head. + Head { + slot: u64, + root: H256, + parent_root: H256, + }, + /// A block was imported into the store. + Block { slot: u64, root: H256 }, + /// The finalized checkpoint advanced. + FinalizedCheckpoint { slot: u64, root: H256 }, +} + +/// Sender half of the chain-event broadcast channel, owned by the actor. +pub type ChainEventTx = broadcast::Sender; + +/// Capacity chosen so a briefly-stalled SSE client is dropped (lagged) rather +/// than back-pressuring the actor. Lagged clients re-sync via backfill. +pub const CHAIN_EVENT_CHANNEL_CAPACITY: usize = 256; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn channel_delivers_head_event() { + let (tx, mut rx) = broadcast::channel::(CHAIN_EVENT_CHANNEL_CAPACITY); + tx.send(ChainEvent::Head { + slot: 7, + root: H256::ZERO, + parent_root: H256::ZERO, + }) + .unwrap(); + match rx.recv().await.unwrap() { + ChainEvent::Head { slot, .. } => assert_eq!(slot, 7), + other => panic!("unexpected: {other:?}"), + } + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 8be7c89f..93c854d0 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -29,9 +29,12 @@ use tracing::{debug, error, info, trace, warn}; use crate::block_builder::ProposerConfig; use crate::store::StoreError; +pub use events::{CHAIN_EVENT_CHANNEL_CAPACITY, ChainEvent, ChainEventTx}; + pub mod aggregation; pub mod block_builder; pub(crate) mod coverage; +pub mod events; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -84,6 +87,7 @@ impl BlockChain { attestation_committee_count: u64, gate_duties: bool, proposer_config: ProposerConfig, + chain_events: ChainEventTx, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -111,6 +115,7 @@ impl BlockChain { proposer_config, pre_merge_coverage: None, sync_status: SyncStatusTracker::new(gate_duties), + chain_events, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -183,6 +188,11 @@ pub struct BlockChainServer { /// validator duties while syncing, unless that gating was disabled at /// startup via `--disable-duty-sync-gate` (then it is metric-only). sync_status: SyncStatusTracker, + + /// Broadcast sender for chain events streamed to SSE subscribers + /// (`GET /lean/v0/events`). The actor is the sole publisher; the RPC + /// handler only subscribes, preserving the one-directional write flow. + chain_events: ChainEventTx, } impl BlockChainServer { @@ -266,7 +276,12 @@ impl BlockChainServer { .is_some(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick(&mut self.store, timestamp_ms, is_proposer); + store::on_tick( + &mut self.store, + timestamp_ms, + is_proposer, + Some(&self.chain_events), + ); // Per-interval duties for this tick. Intervals 0 (block publish) and 3 // (safe-target update) are driven inside `store::on_tick` above, so they @@ -676,7 +691,7 @@ impl BlockChainServer { /// Run block import and refresh metrics. fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> { - store::on_block(&mut self.store, signed_block)?; + store::on_block(&mut self.store, signed_block, Some(&self.chain_events))?; metrics::update_head_slot(self.store.head_slot()); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 20f7cf5c..8aa4b329 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -20,27 +20,33 @@ use crate::{ GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MAX_ATTESTATIONS_DATA, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, block_builder::{PostBlockCheckpoints, ProposerConfig, build_block}, + events::{ChainEvent, ChainEventTx}, metrics, }; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// Accept new aggregated payloads, promoting them to known for fork choice. -fn accept_new_attestations(store: &mut Store, log_tree: bool) { +fn accept_new_attestations(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) { store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); - update_head(store, log_tree); + update_head(store, log_tree, events); } /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII /// fork choice tree to the terminal. -pub fn update_head(store: &mut Store, log_tree: bool) { +/// +/// When `events` is `Some`, emits a [`ChainEvent::Head`] whenever the head +/// changes and a [`ChainEvent::FinalizedCheckpoint`] whenever finalization +/// advances. Send errors (no subscribers) are ignored. +pub fn update_head(store: &mut Store, log_tree: bool, events: Option<&ChainEventTx>) { let blocks = store.get_live_chain(); let attestations = store.extract_latest_known_attestations(); let old_head = store.head(); + let old_finalized = store.latest_finalized(); let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -62,6 +68,31 @@ pub fn update_head(store: &mut Store, log_tree: bool) { .update_checkpoints(ForkCheckpoints::new(new_head, None, finalized)) .expect("update_checkpoints should succeed"); + if let Some(events) = events { + // Emit the new head whenever fork choice moved it. Read the header once + // and reuse it for slot and parent_root so they stay consistent. + if old_head != new_head { + if let Some(new_header) = store.get_block_header(&new_head) { + let _ = events.send(ChainEvent::Head { + slot: new_header.slot, + root: new_head, + parent_root: new_header.parent_root, + }); + } else { + tracing::warn!("head header missing while emitting Head event; skipping"); + } + } + + // Emit a finalized-checkpoint event only when finalization advanced. + let new_finalized = store.latest_finalized(); + if new_finalized.slot > old_finalized.slot || new_finalized.root != old_finalized.root { + let _ = events.send(ChainEvent::FinalizedCheckpoint { + slot: new_finalized.slot, + root: new_finalized.root, + }); + } + } + if old_head != new_head { let old_slot = store .get_block_header(&old_head) @@ -258,7 +289,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { +pub fn on_tick( + store: &mut Store, + timestamp_ms: u64, + has_proposal: bool, + events: Option<&ChainEventTx>, +) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -295,7 +331,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store, false); + accept_new_attestations(store, false, events); } } 1 => { @@ -310,7 +346,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { } 4 => { // End of slot - accept accumulated attestations and log tree - accept_new_attestations(store, true); + accept_new_attestations(store, true, events); } _ => unreachable!("slots only have 5 intervals"), } @@ -489,8 +525,12 @@ fn on_gossip_aggregated_attestation_core( /// /// This is the safe default: it always verifies cryptographic signatures /// and stores them for future block building. Use this for all production paths. -pub fn on_block(store: &mut Store, signed_block: SignedBlock) -> Result<(), StoreError> { - on_block_core(store, signed_block, true) +pub fn on_block( + store: &mut Store, + signed_block: SignedBlock, + events: Option<&ChainEventTx>, +) -> Result<(), StoreError> { + on_block_core(store, signed_block, true, events) } /// Process a new block without signature verification. @@ -501,7 +541,7 @@ pub fn on_block_without_verification( store: &mut Store, signed_block: SignedBlock, ) -> Result<(), StoreError> { - on_block_core(store, signed_block, false) + on_block_core(store, signed_block, false, None) } /// Core block processing logic. @@ -512,6 +552,7 @@ fn on_block_core( store: &mut Store, signed_block: SignedBlock, verify: bool, + events: Option<&ChainEventTx>, ) -> Result<(), StoreError> { let _timing = metrics::time_fork_choice_block_processing(); let block_start = std::time::Instant::now(); @@ -600,8 +641,17 @@ fn on_block_core( metrics::inc_attestations_valid(count); } + // Emit the imported block before fork choice runs, so subscribers see the + // `block` event ahead of any `head` move it triggers. + if let Some(events) = events { + let _ = events.send(ChainEvent::Block { + slot, + root: block_root, + }); + } + // Update forkchoice head based on new block and attestations - update_head(store, false); + update_head(store, false, events); let block_total = block_start.elapsed(); info!( @@ -772,11 +822,16 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; - // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true); + // Advance time to current slot (ticking intervals). + // + // No event sender here: this is the proposer's pre-build catch-up, and the + // block it produces is imported via `on_block` (which emits the resulting + // `Block`/`Head`). Emitting from here would surface a head move before the + // block exists. + on_tick(store, slot_time_ms, true, None); // Process any pending attestations before proposal - accept_new_attestations(store, false); + accept_new_attestations(store, false, None); store.head() } diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 0e1f2e9b..d586bdab 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -103,7 +103,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { if step.tick_to_slot { let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut store, block_time_ms, true); + store::on_tick(&mut store, block_time_ms, true, None); } let result = store::on_block_without_verification(&mut store, signed_block); let import_ok = result.is_ok(); @@ -138,7 +138,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // on_block already ran the head update before these votes // existed; recompute so the head reflects the block's own // attestations, matching the proposer-view store. - store::update_head(&mut store, false); + store::update_head(&mut store, false, None); } } "tick" => { @@ -153,7 +153,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { (None, None) => panic!("tick step missing both time and interval"), }; let has_proposal = step.has_proposal.unwrap_or(false); - store::on_tick(&mut store, timestamp_ms, has_proposal); + store::on_tick(&mut store, timestamp_ms, has_proposal, None); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index a11a58cc..aba2adc4 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -61,10 +61,10 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true); + store::on_tick(&mut st, block_time_ms, true, None); // Process the block (this includes signature verification) - let result = store::on_block(&mut st, signed_block); + let result = store::on_block(&mut st, signed_block, None); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index bebd3218..cee21b9d 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -26,6 +26,8 @@ serde_json.workspace = true hex.workspace = true tracing.workspace = true jemalloc_pprof.workspace = true +tokio-stream = { version = "0.1", features = ["sync"] } +futures-core = "0.3" [dev-dependencies] ethlambda-types.workspace = true diff --git a/crates/net/rpc/src/events.rs b/crates/net/rpc/src/events.rs new file mode 100644 index 00000000..4966c0d1 --- /dev/null +++ b/crates/net/rpc/src/events.rs @@ -0,0 +1,100 @@ +//! `GET /lean/v0/events` — Server-Sent Events stream of chain events. +//! +//! The [`ethlambda_blockchain::BlockChainServer`] actor publishes +//! [`ChainEvent`]s on a broadcast channel; this read-only handler subscribes a +//! new receiver per connection and forwards each event as an SSE message. The +//! flow is strictly one-directional (actor → broadcast → SSE), so RPC never +//! writes into the actor. + +use std::convert::Infallible; + +use axum::{ + Extension, Router, + response::{Sse, sse::Event}, + routing::get, +}; +use ethlambda_blockchain::ChainEvent; +use ethlambda_storage::Store; +use futures_core::Stream; +use tokio::sync::broadcast; +use tokio_stream::{ + StreamExt, + wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, +}; + +async fn get_events( + Extension(tx): Extension>, +) -> Sse>> { + let stream = BroadcastStream::new(tx.subscribe()).filter_map(|res| { + // A slow client falls behind and the broadcast channel overwrites + // events it never read. Surface that rather than silently dropping. + let ev = match res { + Ok(ev) => ev, + Err(BroadcastStreamRecvError::Lagged(skipped)) => { + tracing::debug!(skipped, "SSE client lagged; dropped chain events"); + return None; + } + }; + let name = match &ev { + ChainEvent::Head { .. } => "head", + ChainEvent::Block { .. } => "block", + ChainEvent::FinalizedCheckpoint { .. } => "finalized_checkpoint", + }; + Some(Ok(Event::default() + .event(name) + .json_data(ev) + .inspect_err(|err| tracing::warn!(%err, "failed to serialize SSE chain event")) + .ok()?)) + }); + Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()) +} + +pub(crate) fn routes() -> Router { + Router::new().route("/lean/v0/events", get(get_events)) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{body::Body, http::Request}; + use ethlambda_storage::{Store, backend::InMemoryBackend}; + use std::sync::Arc; + use tower::ServiceExt; + + use crate::test_utils::create_test_state; + + #[tokio::test] + async fn events_streams_head() { + let (tx, _) = broadcast::channel::(16); + let store = Store::from_anchor_state(Arc::new(InMemoryBackend::new()), create_test_state()); + let app = crate::build_api_router(store).layer(Extension(tx.clone())); + + // Issue the request first so the handler subscribes its receiver before + // we publish — `broadcast::send` errors if there are no live receivers. + let resp = app + .oneshot( + Request::builder() + .uri("/lean/v0/events") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), axum::http::StatusCode::OK); + + tx.send(ChainEvent::Head { + slot: 3, + root: Default::default(), + parent_root: Default::default(), + }) + .unwrap(); + + let mut body = resp.into_body().into_data_stream(); + let chunk = tokio_stream::StreamExt::next(&mut body) + .await + .unwrap() + .unwrap(); + let text = String::from_utf8_lossy(&chunk); + assert!(text.contains("event:head") || text.contains("event: head")); + } +} diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 99a2b44c..4195b167 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, SocketAddr}; use axum::{Extension, Router}; +use ethlambda_blockchain::ChainEventTx; use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; use tokio_util::sync::CancellationToken; @@ -11,6 +12,7 @@ pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; mod admin; mod base; mod blocks; +mod events; mod fork_choice; mod heap_profiling; pub mod metrics; @@ -51,9 +53,12 @@ pub async fn start_rpc_server( config: RpcConfig, store: Store, aggregator: AggregatorController, + chain_events: ChainEventTx, shutdown: CancellationToken, ) -> Result<(), std::io::Error> { - let api_router = build_api_router(store).layer(Extension(aggregator)); + let api_router = build_api_router(store) + .layer(Extension(aggregator)) + .layer(Extension(chain_events)); let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); @@ -98,6 +103,7 @@ fn build_api_router(store: Store) -> Router { Router::new() .merge(base::routes()) .merge(blocks::routes()) + .merge(events::routes()) .merge(fork_choice::routes()) .merge(admin::routes()) .with_state(store) diff --git a/crates/net/rpc/src/test_driver.rs b/crates/net/rpc/src/test_driver.rs index e727d756..3ef0eda2 100644 --- a/crates/net/rpc/src/test_driver.rs +++ b/crates/net/rpc/src/test_driver.rs @@ -347,7 +347,12 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { } (None, None) => return Err("tick step missing time and interval".to_string()), }; - store::on_tick(store, timestamp_ms, step.has_proposal.unwrap_or(false)); + store::on_tick( + store, + timestamp_ms, + step.has_proposal.unwrap_or(false), + None, + ); Ok(()) } "block" => { @@ -361,7 +366,7 @@ fn apply_step(store: &mut Store, step: ForkChoiceStep) -> Result<(), String> { if step.tick_to_slot { let block_time_ms = store.config().genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(store, block_time_ms, true); + store::on_tick(store, block_time_ms, true, None); } store::on_block_without_verification(store, signed_block).map_err(|e| e.to_string()) } diff --git a/docs/rpc.md b/docs/rpc.md index c5f94dd5..1e5f68d8 100644 --- a/docs/rpc.md +++ b/docs/rpc.md @@ -26,6 +26,7 @@ If `--api-port` and `--metrics-port` are equal, all routers are merged onto a si | `GET` | `/lean/v0/states/finalized` | SSZ | Latest finalized `State` | | `GET` | `/lean/v0/blocks/finalized` | SSZ | Latest finalized `SignedBlock` | | `GET` | `/lean/v0/checkpoints/justified` | JSON | Latest justified `Checkpoint` | +| `GET` | `/lean/v0/events` | SSE | Live stream of chain events | | `GET` | `/lean/v0/blocks/{block_id}` | JSON | Block by root or slot | | `GET` | `/lean/v0/blocks/{block_id}/header` | JSON | Block header by root or slot | | `GET` | `/lean/v0/fork_choice` | JSON | Fork-choice tree with per-block weights | @@ -55,6 +56,25 @@ SSZ-encoded `SignedBlock` at the latest finalized checkpoint. The genesis/anchor { "slot": 128, "root": "0x1a2b…" } ``` +### `GET /lean/v0/events` + +Server-Sent Events stream (`Content-Type: text/event-stream`) of live chain events published by the blockchain actor. Three event types: + +| Event | Payload | Emitted when | +|-------|---------|--------------| +| `head` | `{ "slot": 128, "root": "0x…", "parent_root": "0x…" }` | Fork choice selects a new head | +| `block` | `{ "slot": 128, "root": "0x…" }` | A block is imported into the store | +| `finalized_checkpoint` | `{ "slot": 96, "root": "0x…" }` | The finalized checkpoint advances | + +Example frame: + +``` +event: head +data: {"slot":128,"root":"0x1a2b…","parent_root":"0x3c4d…"} +``` + +Events are fanned out over a bounded broadcast channel (`CHAIN_EVENT_CHANNEL_CAPACITY`). A client that reads too slowly skips past the events it missed — they are dropped for that subscriber rather than back-pressured onto the actor — so treat the stream as best-effort and re-sync via the blocks endpoints after a gap. Keep-alive comments are sent periodically to hold idle connections open. + ### `GET /lean/v0/blocks/{block_id}` and `/header` `block_id` is either: @@ -144,6 +164,7 @@ When the binary boots with `HIVE_LEAN_TEST_DRIVER=1` (any of `1`/`true`/`yes`), | Kind | `Content-Type` | |------|----------------| | JSON | `application/json; charset=utf-8` | +| SSE | `text/event-stream` | | SSZ | `application/octet-stream` | | Prometheus metrics | `text/plain; version=0.0.4; charset=utf-8` | | HTML | `text/html; charset=utf-8` |