diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 7287accc4ee..690f9237303 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -48,7 +48,7 @@ use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue} use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::reducer_name::ReducerName; use spacetimedb_schema::schema::{ - ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, + ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, }; use spacetimedb_schema::table_name::TableName; use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotRepository}; @@ -1483,6 +1483,11 @@ impl RelationalDB { Ok(self.inner.drop_sequence_mut_tx(tx, seq_id)?) } + /// Creates a new constraint in the database instance. + pub fn create_constraint(&self, tx: &mut MutTx, constraint: ConstraintSchema) -> Result { + Ok(self.inner.create_constraint_mut_tx(tx, constraint)?) + } + ///Removes the [Constraints] from database instance pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> { Ok(self.inner.drop_constraint_mut_tx(tx, constraint_id)?) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 6c7c3bd9fc8..cd9aebbf36e 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -8,6 +8,7 @@ use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColSet, TableId}; use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan}; use spacetimedb_schema::def::{TableDef, ViewDef}; +use spacetimedb_schema::schema::ConstraintSchema; use spacetimedb_schema::schema::{column_schemas_from_defs, IndexSchema, Schema, SequenceSchema, TableSchema}; /// The logger used for by [`update_database`] and friends. @@ -233,6 +234,29 @@ fn auto_migrate_database( ); stdb.drop_constraint(tx, constraint_schema.constraint_id)?; } + spacetimedb_schema::auto_migrate::AutoMigrateStep::AddConstraint(constraint_name) => { + let table_def = plan + .new + .stored_in_table_def(constraint_name) + .expect("AddConstraint references a table that should exist in the new module def"); + let constraint_def = &table_def.constraints[constraint_name]; + let table_id = stdb + .table_id_from_name_mut(tx, &table_def.name)? + .expect("table should exist in the database for AddConstraint"); + let constraint_schema = ConstraintSchema::from_module_def( + plan.new, + constraint_def, + table_id, + spacetimedb_primitives::ConstraintId::SENTINEL, + ); + log!( + logger, + "Adding constraint `{}` on table `{}`", + constraint_name, + table_def.name + ); + stdb.create_constraint(tx, constraint_schema)?; + } spacetimedb_schema::auto_migrate::AutoMigrateStep::AddSequence(sequence_name) => { let table_def = plan.new.stored_in_table_def(sequence_name).unwrap(); let sequence_def = table_def.sequences.get(sequence_name).unwrap(); diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index c479be085d9..24112466c39 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -758,14 +758,38 @@ impl CommittedState { .unwrap_or_else(|e| match e {}); } // A constraint was removed. Add it back. - ConstraintRemoved(table_id, constraint_schema) => { + ConstraintRemoved(table_id, constraint_schema, index_ids) => { let table = self.tables.get_mut(&table_id)?; table.with_mut_schema(|s| s.update_constraint(constraint_schema)); + // If the constraint had unique indices, make them unique again. + for index_id in index_ids { + if let Some(idx) = table.indexes.get_mut(&index_id) { + idx.make_unique().expect("rollback: index should have no duplicates"); + } + } + // Forward `drop_constraint` calls `Table::make_index_non_unique`, which + // rebuilds the pointer map when no unique index remained. Whatever the + // forward path did, the table invariant "pointer map is present iff no + // unique index exists" (see `table.rs`) is about to be re-established + // by the `make_unique` calls above — drop any rebuilt map now. + // `take_pointer_map` is idempotent: it returns `None` when the map is + // already absent, so it is safe to call unconditionally. + table.take_pointer_map(); } // A constraint was added. Remove it. - ConstraintAdded(table_id, constraint_id) => { + ConstraintAdded(table_id, constraint_id, index_ids, pointer_map) => { let table = self.tables.get_mut(&table_id)?; table.with_mut_schema(|s| s.remove_constraint(constraint_id)); + // If the constraint made indices unique, revert them to non-unique. + for index_id in index_ids { + if let Some(idx) = table.indexes.get_mut(&index_id) { + idx.make_non_unique(); + } + } + // Restore the pointer map if it was taken. + if let Some(pm) = pointer_map { + table.restore_pointer_map(pm); + } } // A sequence was removed. Add it back. SequenceRemoved(table_id, seq, schema) => { diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index edcce91ce5e..1ee7ba296d1 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -37,7 +37,7 @@ use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::table_name::TableName; use spacetimedb_schema::{ reducer_name::ReducerName, - schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}, + schema::{ColumnSchema, ConstraintSchema, IndexSchema, SequenceSchema, TableSchema}, }; use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot}; use spacetimedb_table::{ @@ -548,6 +548,10 @@ impl MutTxDatastore for Locking { tx.sequence_id_from_name(sequence_name) } + fn create_constraint_mut_tx(&self, tx: &mut Self::MutTx, constraint: ConstraintSchema) -> Result { + tx.create_constraint(constraint) + } + fn drop_constraint_mut_tx(&self, tx: &mut Self::MutTx, constraint_id: ConstraintId) -> Result<()> { tx.drop_constraint(constraint_id) } @@ -995,7 +999,7 @@ pub(crate) mod tests { use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; - use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; + use spacetimedb_primitives::{col_list, ArgId, ColId, ColSet, ScheduleId, ViewId}; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::bsatn::{to_vec, ToBsatn}; use spacetimedb_sats::layout::RowTypeLayout; @@ -3642,4 +3646,403 @@ pub(crate) mod tests { ); Ok(()) } + + /// Creates a table with a non-unique btree index on `cols` but no constraints. + fn table_with_non_unique_index(cols: impl Into) -> TableSchema { + let indices = vec![IndexSchema::for_test( + "Foo_idx_btree", + BTreeAlgorithm { columns: cols.into() }, + )]; + basic_table_schema_with_indices(indices, Vec::::new()) + } + + #[test] + fn test_create_constraint_makes_index_unique() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with non-unique index on col 0. + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(0u16); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert unique rows and commit. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(2, "Bob", 25))?; + commit(&datastore, tx)?; + + // TX3: add unique constraint — should succeed since data is unique. + let mut tx = begin_mut_tx(&datastore); + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint.table_id = table_id; + datastore.create_constraint_mut_tx(&mut tx, constraint)?; + + // Inserting a duplicate should now fail (index is unique). + let dup_result = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Charlie", 20)); + assert!( + dup_result.is_err(), + "duplicate insert should fail after adding unique constraint" + ); + commit(&datastore, tx)?; + + Ok(()) + } + + #[test] + fn test_create_constraint_rollback_restores_non_unique() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with non-unique index on col 0. + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(0u16); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert unique rows and commit. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(2, "Bob", 25))?; + commit(&datastore, tx)?; + + // TX3: add unique constraint, then rollback. + let mut tx = begin_mut_tx(&datastore); + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint.table_id = table_id; + datastore.create_constraint_mut_tx(&mut tx, constraint)?; + let _ = datastore.rollback_mut_tx(tx); + + // TX4: after rollback, duplicates should be allowed again. + let mut tx = begin_mut_tx(&datastore); + let result = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Charlie", 20)); + assert!( + result.is_ok(), + "duplicate insert should succeed after rollback of unique constraint" + ); + Ok(()) + } + + #[test] + fn test_create_constraint_fails_with_duplicates() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with non-unique index on col 0. + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(0u16); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert duplicate rows and commit. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Bob", 25))?; // duplicate id=1 + commit(&datastore, tx)?; + + // TX3: try to add unique constraint — should fail. + let mut tx = begin_mut_tx(&datastore); + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint.table_id = table_id; + let result = datastore.create_constraint_mut_tx(&mut tx, constraint); + assert!(result.is_err(), "create_constraint should fail when duplicates exist"); + + Ok(()) + } + + #[test] + fn test_create_constraint_multi_col() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with non-unique multi-column index on (col 0, col 2). + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(col_list![0, 2]); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert rows unique on (id, age) and commit. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Bob", 25))?; // same id, different age + commit(&datastore, tx)?; + + // TX3: add unique constraint on (col 0, col 2) — should succeed. + let mut tx = begin_mut_tx(&datastore); + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_age_unique", ColSet::from(col_list![0, 2])); + constraint.table_id = table_id; + datastore.create_constraint_mut_tx(&mut tx, constraint)?; + commit(&datastore, tx)?; + + Ok(()) + } + + #[test] + fn test_drop_constraint_makes_index_non_unique() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with unique constraint. + let mut tx = begin_mut_tx(&datastore); + let schema = basic_table_schema_with_indices(basic_indices(), basic_constraints()); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert a row. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + commit(&datastore, tx)?; + + // TX3: drop the unique constraint on col 0. + let mut tx = begin_mut_tx(&datastore); + let constraint_id = tx + .constraint_id_from_name("Foo_id_key")? + .expect("constraint should exist"); + datastore.drop_constraint_mut_tx(&mut tx, constraint_id)?; + + // Inserting a duplicate on col 0 should now succeed. + let result = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Bob", 25)); + assert!( + result.is_ok(), + "duplicate insert should succeed after dropping unique constraint" + ); + commit(&datastore, tx)?; + + Ok(()) + } + + #[test] + fn test_drop_constraint_rollback_keeps_unique() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with unique constraint. + let mut tx = begin_mut_tx(&datastore); + let schema = basic_table_schema_with_indices(basic_indices(), basic_constraints()); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert a row. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + commit(&datastore, tx)?; + + // TX3: drop constraint, then rollback. + let mut tx = begin_mut_tx(&datastore); + let constraint_id = tx + .constraint_id_from_name("Foo_id_key")? + .expect("constraint should exist"); + datastore.drop_constraint_mut_tx(&mut tx, constraint_id)?; + let _ = datastore.rollback_mut_tx(tx); + + // TX4: after rollback, constraint should be back — duplicates should fail. + let mut tx = begin_mut_tx(&datastore); + let dup_result = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Bob", 25)); + assert!( + dup_result.is_err(), + "duplicate insert should fail after rollback of drop constraint" + ); + Ok(()) + } + + #[test] + fn test_create_constraint_merge_error_reverts_uniqueness() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with non-unique index on col 0. + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(0u16); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: insert a row into committed state. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Alice", 30))?; + commit(&datastore, tx)?; + + // TX3: insert a conflicting row in the tx state, then try adding a unique constraint. + // The committed table has id=1 and the tx state also has id=1. + // The committed index check passes (no duplicates within committed data alone), + // but `can_merge` should fail because tx + committed have the same key. + let mut tx = begin_mut_tx(&datastore); + insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Bob", 25))?; + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint.table_id = table_id; + let result = datastore.create_constraint_mut_tx(&mut tx, constraint); + assert!( + result.is_err(), + "create_constraint should fail when tx state conflicts with committed state" + ); + + // Before rolling back, verify the merge-error revert path already restored the + // index to non-unique within the still-open failing tx: a further duplicate + // insert on the same key must succeed. + let pre_rollback_dup = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Dave", 19)); + assert!( + pre_rollback_dup.is_ok(), + "index must be non-unique inside the failing tx: the merge-error path should revert `make_unique` before returning", + ); + + // Rollback and verify the index is still non-unique. + let _ = datastore.rollback_mut_tx(tx); + let mut tx = begin_mut_tx(&datastore); + let dup_result = insert(&datastore, &mut tx, table_id, &u32_str_u32(1, "Charlie", 20)); + assert!( + dup_result.is_ok(), + "index should be non-unique after merge-error rollback: duplicates must be allowed" + ); + Ok(()) + } + + #[test] + fn test_drop_constraint_rollback_restores_pointer_map_invariant() -> ResultTest<()> { + // Invariant (table.rs): a table holds a pointer map iff it has no unique index. + // + // Forward `drop_constraint` on a table with EXACTLY ONE unique constraint makes + // the backing index non-unique and rebuilds the pointer map (because no unique + // index remains to enforce set semantics). On rollback we must (a) restore the + // index to unique AND (b) drop the rebuilt pointer map — otherwise the table + // ends up with BOTH a unique index AND a pointer map, breaking the invariant. + // + // The schema must have exactly one unique constraint, otherwise the forward + // path's `!has_unique_index()` check short-circuits the rebuild and the test + // would pass trivially on broken code. + let datastore = get_datastore()?; + + // TX1: create table with a single unique constraint on col 0. + let mut tx = begin_mut_tx(&datastore); + let indices = vec![IndexSchema::for_test("Foo_id_idx_btree", BTreeAlgorithm::from(0))]; + let constraints = vec![ConstraintSchema::unique_for_test("Foo_id_key", 0u16)]; + let schema = basic_table_schema_with_indices(indices, constraints); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // Baseline: with a unique index in place there should be no pointer map. + { + let committed = datastore.committed_state.read(); + let table = committed.tables.get(&table_id).expect("table should exist"); + assert!(table.has_unique_index(), "baseline: table should have a unique index"); + assert!( + !table.has_pointer_map(), + "baseline: a unique index subsumes the pointer map", + ); + } + + // TX2: drop the only unique constraint, then rollback. Forward drop rebuilds the + // pointer map; rollback must drop it. + let mut tx = begin_mut_tx(&datastore); + let constraint_id = tx + .constraint_id_from_name("Foo_id_key")? + .expect("constraint should exist"); + datastore.drop_constraint_mut_tx(&mut tx, constraint_id)?; + let _ = datastore.rollback_mut_tx(tx); + + // After rollback: the unique index must be restored AND the pointer map that the + // forward path rebuilt must be discarded. + let committed = datastore.committed_state.read(); + let table = committed.tables.get(&table_id).expect("table should exist"); + assert!( + table.has_unique_index(), + "rollback of drop_constraint must restore the unique index", + ); + assert!( + !table.has_pointer_map(), + "rollback must discard the pointer map rebuilt by forward drop_constraint — having both a unique index and a pointer map breaks the table invariant", + ); + Ok(()) + } + + #[test] + fn test_create_constraint_is_idempotent_and_does_not_clobber_pending_changes() -> ResultTest<()> { + // `create_st_constraint` short-circuits when the exact `st_constraint` row already + // exists in the tx insert table (`RowRefInsertion::Existed`), and in that case does + // NOT push a `PendingSchemaChange`. Previously `create_constraint` would then + // blindly overwrite `pending_schema_changes.last_mut()`, clobbering whichever + // unrelated change happened to be at the end of the list. + // + // This test drives that exact scenario: run `create_constraint` once on a fresh + // schema, inject an unrelated marker `PendingSchemaChange` at the end of the tx + // pending list, then call `create_constraint` again with a byte-identical + // `ConstraintSchema` (same constraint_id). The second call must hit the Existed + // path and leave the marker untouched. + use super::super::tx_state::PendingSchemaChange; + + let datastore = get_datastore()?; + + // TX1: create table with a non-unique index on col 0. + let mut tx = begin_mut_tx(&datastore); + let schema = table_with_non_unique_index(0u16); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: first call — adds the constraint, makes the index unique. + let mut tx = begin_mut_tx(&datastore); + let mut constraint_a = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint_a.table_id = table_id; + let constraint_id = datastore.create_constraint_mut_tx(&mut tx, constraint_a)?; + + // Inject an unrelated marker at the end of the pending-changes list. + // If `create_constraint` clobbers `last_mut()` on the Existed path, this marker + // gets overwritten with a ConstraintAdded; otherwise it stays a TableAdded. + tx.tx_state + .pending_schema_changes + .push(PendingSchemaChange::TableAdded(TableId::SENTINEL)); + let expected_len = tx.tx_state.pending_schema_changes.len(); + + // Second call — identical ConstraintSchema with the now-known constraint_id. + // The row bytes match the one just inserted, so `create_st_constraint` takes the + // `RowRefInsertion::Existed` short-circuit and pushes nothing. + let mut constraint_b = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint_b.table_id = table_id; + constraint_b.constraint_id = constraint_id; + let returned_id = datastore.create_constraint_mut_tx(&mut tx, constraint_b)?; + assert_eq!( + returned_id, constraint_id, + "idempotent re-add must return the existing constraint id", + ); + + // Length unchanged (no new push), and — critically — the marker is still the last + // element (not clobbered into a ConstraintAdded). + assert_eq!( + tx.tx_state.pending_schema_changes.len(), + expected_len, + "Existed path must not push a new pending schema change", + ); + assert_eq!( + tx.tx_state.pending_schema_changes.last(), + Some(&PendingSchemaChange::TableAdded(TableId::SENTINEL)), + "Existed path must not overwrite an unrelated trailing pending schema change", + ); + Ok(()) + } + + #[test] + fn test_create_constraint_fails_without_backing_index() -> ResultTest<()> { + let datastore = get_datastore()?; + + // TX1: create table with ZERO indices. + let mut tx = begin_mut_tx(&datastore); + let schema = basic_table_schema_with_indices(Vec::::new(), Vec::::new()); + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + commit(&datastore, tx)?; + + // TX2: try to add a unique constraint on col 0 — no backing index exists. + let mut tx = begin_mut_tx(&datastore); + let mut constraint = ConstraintSchema::unique_for_test("Foo_id_unique", 0u16); + constraint.table_id = table_id; + let result = datastore.create_constraint_mut_tx(&mut tx, constraint); + assert!( + result.is_err(), + "create_constraint must fail when no backing index covers the constrained columns" + ); + let err_msg = format!("{:#}", result.unwrap_err()); + assert!( + err_msg.contains("requires at least one backing index"), + "error message should mention the missing backing index, got: {err_msg}", + ); + + // Verify no orphan st_constraint row was persisted: `constraint_id_from_name` + // must return `None` because pre-validation fired before `create_st_constraint`. + let orphan = tx.constraint_id_from_name("Foo_id_unique")?; + assert!( + orphan.is_none(), + "pre-validation must run before create_st_constraint; no st_constraint row may be written on failure", + ); + Ok(()) + } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 9008bc6da4e..750009bebe9 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -703,8 +703,10 @@ impl MutTxId { } // Insert constraints into `st_constraints`. + // The fresh table has no `st_constraint` rows yet, so every row here is newly + // inserted — the `bool` in the returned tuple is always `true` and is ignored. for constraint in constraints { - self.create_constraint(constraint)?; + let _ = self.create_st_constraint(constraint)?; } // Insert sequences into `st_sequences`. @@ -1783,19 +1785,29 @@ impl MutTxId { }) } - /// Create a constraint. + /// Inserts constraint metadata into system tables only. + /// + /// This is used during `create_table` where the index is already created + /// with the correct uniqueness. For adding constraints to existing tables, + /// use [`Self::create_constraint`] instead. /// /// Requires: /// - `constraint.constraint_name` must not be used for any other database entity. - /// - `constraint.constraint_id == ConstraintId::SENTINEL` - /// - `constraint.table_id != TableId::SENTINEL` - /// - `is_unique` must be `true` if and only if a unique constraint will exist on - /// `ColSet::from(&constraint.constraint_algorithm.columns())` after this transaction is committed. + /// - `constraint.constraint_id == ConstraintId::SENTINEL`. + /// - `constraint.table_id != TableId::SENTINEL`. + /// - The caller is responsible for ensuring that the backing indices on + /// `ColSet::from(&constraint.data.unique_columns())` already have the correct + /// uniqueness — this method does not touch the in-memory index uniqueness. + /// Use [`Self::create_constraint`] if the indices need to be converted. /// /// Ensures: /// - The constraint metadata is inserted into the system tables (and other data structures reflecting them). - /// - The returned ID is unique and is not `constraintId::SENTINEL`. - fn create_constraint(&mut self, mut constraint: ConstraintSchema) -> Result { + /// - The returned ID is unique and is not `ConstraintId::SENTINEL`. + /// - The `bool` in the return value is `true` iff a new `st_constraint` row was + /// inserted (and therefore a `PendingSchemaChange::ConstraintAdded` was pushed). + /// It is `false` if an identical row already existed (idempotent re-insertion); + /// in that case the schema and pending-changes list are untouched. + fn create_st_constraint(&mut self, mut constraint: ConstraintSchema) -> Result<(ConstraintId, bool)> { if constraint.table_id == TableId::SENTINEL { return Err(anyhow::anyhow!("`table_id` must not be `TableId::SENTINEL` in `{constraint:#?}`").into()); } @@ -1823,20 +1835,26 @@ impl MutTxId { let constraint_id = constraint_row.1.collapse().read_col(StConstraintFields::ConstraintId)?; if let RowRefInsertion::Existed(_) = constraint_row.1 { log::trace!("CONSTRAINT ALREADY EXISTS: {constraint_id}"); - return Ok(constraint_id); + return Ok((constraint_id, false)); } let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?; constraint.constraint_id = constraint_id; // This won't clone-write when creating a table but likely to otherwise. tx_table.with_mut_schema_and_clone(commit_table, |s| s.update_constraint(constraint.clone())); - self.push_schema_change(PendingSchemaChange::ConstraintAdded(table_id, constraint_id)); + self.push_schema_change(PendingSchemaChange::ConstraintAdded( + table_id, + constraint_id, + vec![], + None, + )); log::trace!("CONSTRAINT CREATED: {constraint_id}"); - Ok(constraint_id) + Ok((constraint_id, true)) } - pub fn drop_constraint(&mut self, constraint_id: ConstraintId) -> Result<()> { + /// Removes constraint metadata from system tables only. + fn drop_st_constraint(&mut self, constraint_id: ConstraintId) -> Result<(TableId, ConstraintSchema)> { // Delete row in `st_constraint`. let st_constraint_ref = self .iter_by_col_eq( @@ -1851,12 +1869,208 @@ impl MutTxId { // Remove constraint in transaction's insert table. let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?; - // This likely will do a clone-write as over time? - // The schema might have found other referents. + // This likely will do a clone-write as over time + // the schema might have found other referents. let schema = commit_table .with_mut_schema_and_clone(tx_table, |s| s.remove_constraint(constraint_id)) .expect("there should be a schema in the committed state if we reach here"); - self.push_schema_change(PendingSchemaChange::ConstraintRemoved(table_id, schema)); + + Ok((table_id, schema)) + } + + /// Creates a constraint, making the corresponding indices unique. + /// + /// This inserts constraint metadata AND converts the in-memory indices + /// from non-unique to unique. If the existing data contains duplicate + /// values in the constrained columns, an error is returned. + /// + /// Pre-validation (before any system-table mutation): + /// - the constraint must be a unique one (the only kind supported today); + /// - the target table must already have at least one index on the constrained columns. + pub fn create_constraint(&mut self, constraint: ConstraintSchema) -> Result { + let table_id = constraint.table_id; + + // (a) Only unique constraints are supported at the moment. Reject anything else + // up front, before writing to `st_constraint`. + let Some(cols) = constraint.data.unique_columns().cloned() else { + return Err(anyhow::anyhow!( + "adding non-unique constraints is not supported (constraint on table {table_id})" + ) + .into()); + }; + let col_list: ColList = cols.into(); + + // (b) A unique constraint must be backed by at least one index on the same columns. + // Check on the committed table; the tx table's index set is kept in lockstep + // with the committed one by the datastore, so agreement is an invariant. + { + let (_, (commit_table, _, _)) = self.get_or_create_insert_table_mut(table_id)?; + if commit_table.get_indexes_by_cols(&col_list).is_empty() { + return Err(anyhow::anyhow!( + "unique constraint on table {table_id} column(s) {col_list:?} \ + requires at least one backing index on those columns" + ) + .into()); + } + } + + // (c) Validation passed — insert metadata into system tables. On any failure + // beyond this point, the tx rollback unwinds both the st_constraint row and + // the pending schema change. + let (constraint_id, newly_inserted) = self.create_st_constraint(constraint)?; + + // If the constraint already existed in `st_constraint`, nothing new was pushed + // to `pending_schema_changes`, and the backing indices are already in the + // correct state. Return early — in particular, do NOT overwrite + // `pending_schema_changes.last_mut()`, which would clobber an unrelated change. + if !newly_inserted { + return Ok(constraint_id); + } + + // Re-borrow after the system-table write (self was reborrowed by create_st_constraint). + let ((tx_table, _, tx_delete_table), (commit_table, commit_blob_store, _)) = + self.get_or_create_insert_table_mut(table_id)?; + + // Find all indices matching these columns. Pre-validation guarantees non-empty. + let index_ids: Vec<_> = commit_table + .get_indexes_by_cols(&col_list) + .into_iter() + .map(|(id, _)| id) + .collect(); + debug_assert!( + !index_ids.is_empty(), + "pre-validation guaranteed at least one backing index", + ); + + // Revert previously-made-unique `commit_table` and `tx_table` indices. Used on + // both the committed-state duplicate path and the tx-state merge-conflict path + // so a failing `create_constraint` leaves the tx in the same shape it had before. + let revert = |commit_table: &mut Table, tx_table: &mut Table, up_to: usize| { + for &id in &index_ids[..up_to] { + if let Some(idx) = commit_table.indexes.get_mut(&id) { + idx.make_non_unique(); + } + if let Some(idx) = tx_table.indexes.get_mut(&id) { + idx.make_non_unique(); + } + } + }; + + // Record whether this table had a unique index before. + let had_unique = commit_table.has_unique_index(); + + // Build a human-readable error from an index's duplicate groups. Used on both the + // committed-state and tx-state `make_unique` failure paths (`source` distinguishes + // which one fired). + let dup_err = |idx: &TableIndex, source: &str| { + let duplicates = idx.iter_duplicates(); + let total = duplicates.len(); + let examples: String = duplicates + .iter() + .take(10) + .map(|(val, count)| format!(" - {val:?} appears {count} times")) + .collect::>() + .join("\n"); + anyhow::anyhow!( + "Cannot add unique constraint on table {table_id} column(s) {col_list:?} \ + ({source}):\n{total} duplicate group(s) found.\n{examples}{}", + if total > 10 { "\n ... and more" } else { "" } + ) + }; + + // Try to make each matching index unique on both tables. `make_unique` fails fast on + // the first duplicate; only if it fails do we run `iter_duplicates` to build a + // human-readable error (showing up to 10 duplicate groups). + for (i, &index_id) in index_ids.iter().enumerate() { + let commit_idx = commit_table.indexes.get_mut(&index_id).expect("index must exist"); + if commit_idx.make_unique().is_err() { + // `make_unique` restored the failing index to non-unique on error. + let err = dup_err(commit_idx, "committed state"); + revert(commit_table, tx_table, i); + return Err(err.into()); + } + + // Tx table can have duplicates too (same-tx inserts before the constraint add). + let tx_idx = tx_table.indexes.get_mut(&index_id).expect("tx index must exist"); + if tx_idx.make_unique().is_err() { + let err = dup_err(tx_idx, "current transaction"); + // Revert the just-made-unique commit index plus any earlier pair. + revert(commit_table, tx_table, i + 1); + return Err(err.into()); + } + } + + // Check that each pair of unique indices can be merged. + for &index_id in &index_ids { + let can_merge_result = { + let commit_idx = &commit_table.indexes[&index_id]; + let tx_idx = &tx_table.indexes[&index_id]; + let is_deleted = |ptr: &RowPointer| tx_delete_table.contains(*ptr); + commit_idx.can_merge(tx_idx, is_deleted) + }; + if let Err(violation) = can_merge_result { + let cols = commit_table.indexes[&index_id].indexed_columns().clone(); + let violation = commit_table + .get_row_ref(commit_blob_store, violation) + .expect("row came from scanning the table") + .project(&cols) + .expect("cols should be valid for this table"); + revert(commit_table, tx_table, index_ids.len()); + return Err(anyhow::anyhow!("Unique constraint violation during merge: {violation:?}").into()); + } + } + + // Take the pointer map if this is the first unique index. + let pointer_map = if !had_unique { + tx_table.take_pointer_map(); + commit_table.take_pointer_map() + } else { + None + }; + + // Update the pending schema change with index info. + // The last pushed change is our ConstraintAdded from create_st_constraint. + // Replace it with the enriched version. + if let Some(last) = self.tx_state.pending_schema_changes.last_mut() { + *last = PendingSchemaChange::ConstraintAdded(table_id, constraint_id, index_ids, pointer_map); + } + + Ok(constraint_id) + } + + /// Drops a constraint, making the corresponding indices non-unique. + pub fn drop_constraint(&mut self, constraint_id: ConstraintId) -> Result<()> { + let (table_id, schema) = self.drop_st_constraint(constraint_id)?; + + // If this was a unique constraint, make all matching indices non-unique. + let unique_cols = schema.data.unique_columns().cloned(); + let made_non_unique_index_ids = if let Some(cols) = unique_cols { + let col_list: ColList = cols.into(); + + let ((tx_table, tx_blob_store, _), (commit_table, commit_blob_store, _)) = + self.get_or_create_insert_table_mut(table_id)?; + + let index_ids: Vec<_> = commit_table + .get_indexes_by_cols(&col_list) + .into_iter() + .map(|(id, _)| id) + .collect(); + + for &index_id in &index_ids { + commit_table.make_index_non_unique(index_id, commit_blob_store); + tx_table.make_index_non_unique(index_id, tx_blob_store); + } + + index_ids + } else { + vec![] + }; + + self.push_schema_change(PendingSchemaChange::ConstraintRemoved( + table_id, + schema, + made_non_unique_index_ids, + )); // TODO(1.0): we should also re-initialize `table` without a unique constraint. // unless some other unique constraint on the same columns exists. // NOTE(centril): is this already handled by dropping the corresponding index? diff --git a/crates/datastore/src/locking_tx_datastore/tx_state.rs b/crates/datastore/src/locking_tx_datastore/tx_state.rs index 4e712bcf684..b46e1a9e577 100644 --- a/crates/datastore/src/locking_tx_datastore/tx_state.rs +++ b/crates/datastore/src/locking_tx_datastore/tx_state.rs @@ -126,10 +126,12 @@ pub enum PendingSchemaChange { /// The primary key of the table with [`TableId`] was changed. /// The old primary key was stored. TableAlterPrimaryKey(TableId, Option), - /// The constraint with [`ConstraintSchema`] was added to the table with [`TableId`]. - ConstraintRemoved(TableId, ConstraintSchema), + /// The constraint with [`ConstraintSchema`] was removed from the table with [`TableId`]. + /// If indices were made non-unique, their [`IndexId`]s are stored. + ConstraintRemoved(TableId, ConstraintSchema, Vec), /// The constraint with [`ConstraintId`] was added to the table with [`TableId`]. - ConstraintAdded(TableId, ConstraintId), + /// If indices were made unique, their [`IndexId`]s and the taken [`PointerMap`] are stored. + ConstraintAdded(TableId, ConstraintId, Vec, Option), /// The [`Sequence`] with [`SequenceSchema`] was added to the table with [`TableId`]. SequenceRemoved(TableId, Sequence, SequenceSchema), /// The sequence with [`SequenceId`] was added to the table with [`TableId`]. @@ -150,10 +152,12 @@ impl MemoryUsage for PendingSchemaChange { Self::TableAlterAccess(table_id, st_access) => table_id.heap_usage() + st_access.heap_usage(), Self::TableAlterRowType(table_id, column_schemas) => table_id.heap_usage() + column_schemas.heap_usage(), Self::TableAlterPrimaryKey(table_id, pk) => table_id.heap_usage() + pk.heap_usage(), - Self::ConstraintRemoved(table_id, constraint_schema) => { - table_id.heap_usage() + constraint_schema.heap_usage() + Self::ConstraintRemoved(table_id, constraint_schema, index_ids) => { + table_id.heap_usage() + constraint_schema.heap_usage() + index_ids.heap_usage() + } + Self::ConstraintAdded(table_id, constraint_id, index_ids, pointer_map) => { + table_id.heap_usage() + constraint_id.heap_usage() + index_ids.heap_usage() + pointer_map.heap_usage() } - Self::ConstraintAdded(table_id, constraint_id) => table_id.heap_usage() + constraint_id.heap_usage(), Self::SequenceRemoved(table_id, sequence, sequence_schema) => { table_id.heap_usage() + sequence.heap_usage() + sequence_schema.heap_usage() } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index e1b99825ae2..22e12b98271 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -15,7 +15,7 @@ use spacetimedb_primitives::*; use spacetimedb_sats::hash::Hash; use spacetimedb_sats::{AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::reducer_name::ReducerName; -use spacetimedb_schema::schema::{IndexSchema, SequenceSchema, TableSchema}; +use spacetimedb_schema::schema::{ConstraintSchema, IndexSchema, SequenceSchema, TableSchema}; use spacetimedb_schema::table_name::TableName; use spacetimedb_table::static_assert_size; use spacetimedb_table::table::RowRef; @@ -641,6 +641,11 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn sequence_id_from_name_mut_tx(&self, tx: &Self::MutTx, sequence_name: &str) -> super::Result>; // Constraints + fn create_constraint_mut_tx( + &self, + tx: &mut Self::MutTx, + constraint: ConstraintSchema, + ) -> super::Result; fn drop_constraint_mut_tx(&self, tx: &mut Self::MutTx, constraint_id: ConstraintId) -> super::Result<()>; fn constraint_id_from_name(&self, tx: &Self::MutTx, constraint_name: &str) -> super::Result>; diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 6d169a902cc..bebaf4a1408 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -263,7 +263,6 @@ pub enum AutoMigrateStep<'def> { RemoveView(::Key<'def>), /// Remove a row-level security query. RemoveRowLevelSecurity(::Key<'def>), - /// Remove an empty table and all its sub-objects (indexes, constraints, sequences). /// Validated at execution time: fails if the table contains data. RemoveTable(::Key<'def>), @@ -289,6 +288,8 @@ pub enum AutoMigrateStep<'def> { AddTable(::Key<'def>), /// Add an index. AddIndex(::Key<'def>), + /// Add a constraint to an existing table (with data validation precheck). + AddConstraint(::Key<'def>), /// Add a sequence. AddSequence(::Key<'def>), /// Add a schedule annotation to a table. @@ -1040,11 +1041,9 @@ fn auto_migrate_constraints( // it's okay to add a constraint in a new table. Ok(()) } else { - // it's not okay to add a new constraint to an existing table. - Err(AutoMigrateError::AddUniqueConstraint { - constraint: new.name.clone(), - } - .into()) + // existing table — duplicate detection happens inside create_constraint + plan.steps.push(AutoMigrateStep::AddConstraint(new.key())); + Ok(()) } } Diff::Remove { old } => { @@ -1545,8 +1544,6 @@ mod tests { let apples = expect_identifier("Apples"); let _bananas = expect_identifier("Bananas"); - let apples_name_unique_constraint = "Apples_name_key"; - let weight = expect_identifier("weight"); let count = expect_identifier("count"); let name = expect_identifier("name"); @@ -1741,10 +1738,8 @@ mod tests { && type1.0 == prod1_ty && type2.0 == new_prod1_ty ); - expect_error_matching!( - result, - AutoMigrateError::AddUniqueConstraint { constraint } => &constraint[..] == apples_name_unique_constraint - ); + // Note: `AddUniqueConstraint` is no longer an error — adding unique constraints + // to existing tables is now allowed; duplicate detection happens inside create_constraint. expect_error_matching!( result, diff --git a/crates/schema/src/auto_migrate/formatter.rs b/crates/schema/src/auto_migrate/formatter.rs index 1f7377209bf..a258797c249 100644 --- a/crates/schema/src/auto_migrate/formatter.rs +++ b/crates/schema/src/auto_migrate/formatter.rs @@ -63,6 +63,10 @@ fn format_step( let constraint_info = extract_constraint_info(*constraint, plan.old)?; f.format_constraint(&constraint_info, Action::Removed) } + AutoMigrateStep::AddConstraint(constraint) => { + let constraint_info = extract_constraint_info(*constraint, plan.new)?; + f.format_constraint(&constraint_info, Action::Created) + } AutoMigrateStep::AddSequence(sequence) => { let sequence_info = extract_sequence_info(*sequence, plan.new)?; f.format_sequence(&sequence_info, Action::Created) diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 1d406b42eaf..e9a1089f0ec 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1503,6 +1503,34 @@ impl Table { Some(index) } + /// Take the pointer map, if any, returning it. + /// + /// This is used when making an index unique — a unique index subsumes + /// the pointer map's role of preventing duplicate rows. + pub fn take_pointer_map(&mut self) -> Option { + self.pointer_map.take() + } + + /// Restore a previously taken pointer map. + /// + /// This is used on rollback when a unique constraint is removed + /// and no other unique indices remain. + pub fn restore_pointer_map(&mut self, pointer_map: PointerMap) { + self.pointer_map = Some(pointer_map); + } + + /// Returns whether this table has any unique index. + pub fn has_unique_index(&self) -> bool { + self.indexes.values().any(|idx| idx.is_unique()) + } + + /// Returns whether this table currently holds a pointer map. + /// + /// Verifies the invariant "pointer map is present iff no unique index exists". + pub fn has_pointer_map(&self) -> bool { + self.pointer_map.is_some() + } + /// Returns an iterator over all the rows of `self`, yielded as [`RowRef`]s. pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> { TableScanIter { @@ -1560,6 +1588,28 @@ impl Table { .map(|(id, idx)| (*id, idx)) } + /// Returns all [`TableIndex`]es with the given [`ColList`]. + pub fn get_indexes_by_cols(&self, cols: &ColList) -> Vec<(IndexId, &TableIndex)> { + self.indexes + .iter() + .filter(|(_, index)| index.indexed_columns() == cols) + .map(|(id, idx)| (*id, idx)) + .collect() + } + + /// Makes the index at `index_id` non-unique. + /// + /// If no unique indices remain after this, rebuilds and restores the pointer map. + pub fn make_index_non_unique(&mut self, index_id: IndexId, blob_store: &dyn BlobStore) { + if let Some(idx) = self.indexes.get_mut(&index_id) { + idx.make_non_unique(); + } + if !self.has_unique_index() && self.pointer_map.is_none() { + let pm = self.rebuild_pointer_map(blob_store); + self.restore_pointer_map(pm); + } + } + /// Clones the structure of this table into a new one with /// the same schema, visitor program, and indices. /// The new table will be completely empty diff --git a/crates/table/src/table_index/btree_index.rs b/crates/table/src/table_index/btree_index.rs index dfc519459a3..b5c84542d9e 100644 --- a/crates/table/src/table_index/btree_index.rs +++ b/crates/table/src/table_index/btree_index.rs @@ -201,6 +201,53 @@ impl BTreeIndex { /// An iterator over all the values in a [`BTreeIndex`]. pub type BTreeIndexIter<'a, K> = ManySameKeyEntryIter<'a, Values<'a, K, SameKeyEntry>>; +impl BTreeIndex { + /// Returns an iterator over keys that have more than one row pointer, + /// yielding `(&key, count)` for each duplicate key. + pub(super) fn iter_duplicates(&self) -> impl Iterator { + self.map.iter().filter_map(|(k, entry)| { + let count = entry.count(); + if count > 1 { + Some((k, count)) + } else { + None + } + }) + } + + /// Check for duplicates and, if none, convert into a `BTreeMap`. + /// + /// Returns `Ok(map)` if every key maps to exactly one row. + /// Returns `Err((self, ptr))` with a witness `RowPointer` of a duplicate if any key + /// maps to more than one row. The original `BTreeIndex` is returned intact on error. + pub(super) fn check_and_into_unique(self) -> Result, (Self, RowPointer)> { + // First pass: check for duplicates (borrows self.map immutably). + let dup = self.map.values().find_map(|entry| { + if entry.count() > 1 { + Some(entry.iter().next().unwrap()) + } else { + None + } + }); + + if let Some(ptr) = dup { + return Err((self, ptr)); + } + + // No duplicates; conversion is infallible. + let result = self + .map + .into_iter() + .map(|(k, entry)| { + let ptr = entry.iter().next().unwrap(); + (k, ptr) + }) + .collect(); + + Ok(result) + } +} + /// An iterator over values in a [`BTreeIndex`] where the keys are in a certain range. pub type BTreeIndexRangeIter<'a, K> = ManySameKeyEntryIter<'a, RangeValues<'a, K, SameKeyEntry>>; diff --git a/crates/table/src/table_index/hash_index.rs b/crates/table/src/table_index/hash_index.rs index 3b2e26d18e4..02d27974d5a 100644 --- a/crates/table/src/table_index/hash_index.rs +++ b/crates/table/src/table_index/hash_index.rs @@ -123,6 +123,51 @@ impl Index for HashIndex { } impl HashIndex { + /// Returns an iterator over keys that have more than one row pointer, + /// yielding `(&key, count)` for each duplicate key. + pub(super) fn iter_duplicates(&self) -> impl Iterator { + self.map.iter().filter_map(|(k, entry)| { + let count = entry.count(); + if count > 1 { + Some((k, count)) + } else { + None + } + }) + } + + /// Check for duplicates and, if none, convert into a `HashMap`. + /// + /// Returns `Ok(map)` if every key maps to exactly one row. + /// Returns `Err((self, ptr))` with a witness `RowPointer` of a duplicate if any key + /// maps to more than one row. The original `HashIndex` is returned intact on error. + pub(super) fn check_and_into_unique(self) -> Result, (Self, RowPointer)> { + // First pass: check for duplicates. + let dup = self.map.values().find_map(|entry| { + if entry.count() > 1 { + Some(entry.iter().next().unwrap()) + } else { + None + } + }); + + if let Some(ptr) = dup { + return Err((self, ptr)); + } + + // No duplicates; conversion is infallible. + let result = self + .map + .into_iter() + .map(|(k, entry)| { + let ptr = entry.iter().next().unwrap(); + (k, ptr) + }) + .collect(); + + Ok(result) + } + /// See [`Index::delete`]. /// /// This version has relaxed bounds diff --git a/crates/table/src/table_index/mod.rs b/crates/table/src/table_index/mod.rs index 8a6bed3e74c..29e548c786f 100644 --- a/crates/table/src/table_index/mod.rs +++ b/crates/table/src/table_index/mod.rs @@ -45,6 +45,7 @@ use core::{fmt, iter}; use enum_as_inner::EnumAsInner; use spacetimedb_primitives::{ColId, ColList}; use spacetimedb_sats::algebraic_value::de::{ValueDeserializeError, ValueDeserializer}; +use spacetimedb_sats::algebraic_value::Packed; use spacetimedb_sats::bsatn::{decode, from_reader}; use spacetimedb_sats::buffer::{DecodeError, DecodeResult}; use spacetimedb_sats::de::{self, DeserializeSeed, Error, ProductVisitor}; @@ -1023,6 +1024,58 @@ macro_rules! same_for_all_types { }; } +/// Defines `try_make_unique`, `into_non_unique`, and `iter_duplicates` +/// for all non-unique ↔ unique variant pairs on `TypedIndex`. +macro_rules! define_uniqueness_conversions { + ( + btree { + $($bt_non:ident <=> $bt_uni:ident : $bt_conv:expr),* $(,)? + } + hash { + $($h_non:ident <=> $h_uni:ident : $h_conv:expr),* $(,)? + } + ) => { + /// Consuming: try to convert a non-unique index to unique. + /// Returns the original on error. + fn try_make_unique(self) -> Result { + match self { + $(Self::$bt_non(mm) => mm.check_and_into_unique() + .map(|m| Self::$bt_uni(UniqueBTreeIndex::from_non_unique(m))) + .map_err(|(mm, p)| (Self::$bt_non(mm), p)),)* + $(Self::$h_non(hi) => hi.check_and_into_unique() + .map(|m| Self::$h_uni(UniqueHashIndex::from_non_unique(m))) + .map_err(|(hi, p)| (Self::$h_non(hi), p)),)* + other => Ok(other), + } + } + + /// Consuming: convert a unique index back to non-unique. + /// Non-unique and direct indices are returned as-is. + fn into_non_unique(self) -> Self { + match self { + $(Self::$bt_uni(um) => Self::$bt_non(um.into_non_unique()),)* + $(Self::$h_uni(uh) => Self::$h_non(uh.into_non_unique()),)* + other => other, + } + } + + /// Returns all duplicate keys (count > 1) in this non-unique index, + /// with keys converted to [`AlgebraicValue`] via `key_type`. + /// Returns an empty vec for unique indices. + fn iter_duplicates(&self, key_type: &AlgebraicType) -> Vec<(AlgebraicValue, usize)> { + match self { + $(Self::$bt_non(mm) => mm.iter_duplicates() + .map(|(k, c)| ($bt_conv(k, key_type), c)) + .collect(),)* + $(Self::$h_non(hi) => hi.iter_duplicates() + .map(|(k, c)| ($h_conv(k, key_type), c)) + .collect(),)* + _ => Vec::new(), + } + } + }; +} + impl MemoryUsage for TypedIndex { fn heap_usage(&self) -> usize { same_for_all_types!(self, this => this.heap_usage()) @@ -1799,6 +1852,108 @@ impl TypedIndex { pub fn num_key_bytes(&self) -> u64 { same_for_all_types!(self, this => this.num_key_bytes()) } + + define_uniqueness_conversions! { + btree { + BTreeBool <=> UniqueBTreeBool : |k: &bool, _: &AlgebraicType| AlgebraicValue::Bool(*k), + BTreeU8 <=> UniqueBTreeU8 : |k: &u8, _: &AlgebraicType| AlgebraicValue::U8(*k), + BTreeSumTag <=> UniqueBTreeSumTag : |k: &SumTag, _: &AlgebraicType| AlgebraicValue::U8(k.0), + BTreeI8 <=> UniqueBTreeI8 : |k: &i8, _: &AlgebraicType| AlgebraicValue::I8(*k), + BTreeU16 <=> UniqueBTreeU16 : |k: &u16, _: &AlgebraicType| AlgebraicValue::U16(*k), + BTreeI16 <=> UniqueBTreeI16 : |k: &i16, _: &AlgebraicType| AlgebraicValue::I16(*k), + BTreeU32 <=> UniqueBTreeU32 : |k: &u32, _: &AlgebraicType| AlgebraicValue::U32(*k), + BTreeI32 <=> UniqueBTreeI32 : |k: &i32, _: &AlgebraicType| AlgebraicValue::I32(*k), + BTreeU64 <=> UniqueBTreeU64 : |k: &u64, _: &AlgebraicType| AlgebraicValue::U64(*k), + BTreeI64 <=> UniqueBTreeI64 : |k: &i64, _: &AlgebraicType| AlgebraicValue::I64(*k), + BTreeU128 <=> UniqueBTreeU128 : |k: &u128, _: &AlgebraicType| AlgebraicValue::U128(Packed(*k)), + BTreeI128 <=> UniqueBTreeI128 : |k: &i128, _: &AlgebraicType| AlgebraicValue::I128(Packed(*k)), + BTreeU256 <=> UniqueBTreeU256 : |k: &u256, _: &AlgebraicType| AlgebraicValue::U256(Box::new(*k)), + BTreeI256 <=> UniqueBTreeI256 : |k: &i256, _: &AlgebraicType| AlgebraicValue::I256(Box::new(*k)), + BTreeF32 <=> UniqueBTreeF32 : |k: &F32, _: &AlgebraicType| AlgebraicValue::F32(*k), + BTreeF64 <=> UniqueBTreeF64 : |k: &F64, _: &AlgebraicType| AlgebraicValue::F64(*k), + BTreeString <=> UniqueBTreeString : |k: &str, _: &AlgebraicType| AlgebraicValue::String(k.into()), + BTreeAV <=> UniqueBTreeAV : |k: &AlgebraicValue, _: &AlgebraicType| k.clone(), + BTreeBytesKey8 <=> UniqueBTreeBytesKey8 + : |k: &RangeCompatBytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey8B(*k).into_algebraic_value(ty), + BTreeBytesKey16 <=> UniqueBTreeBytesKey16 + : |k: &RangeCompatBytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey16(*k).into_algebraic_value(ty), + BTreeBytesKey32 <=> UniqueBTreeBytesKey32 + : |k: &RangeCompatBytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey32(*k).into_algebraic_value(ty), + BTreeBytesKey64 <=> UniqueBTreeBytesKey64 + : |k: &RangeCompatBytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey64(*k).into_algebraic_value(ty), + BTreeBytesKey128 <=> UniqueBTreeBytesKey128 + : |k: &RangeCompatBytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey128(*k).into_algebraic_value(ty), + } + hash { + HashBool <=> UniqueHashBool : |k: &bool, _: &AlgebraicType| AlgebraicValue::Bool(*k), + HashU8 <=> UniqueHashU8 : |k: &u8, _: &AlgebraicType| AlgebraicValue::U8(*k), + HashSumTag <=> UniqueHashSumTag : |k: &SumTag, _: &AlgebraicType| AlgebraicValue::U8(k.0), + HashI8 <=> UniqueHashI8 : |k: &i8, _: &AlgebraicType| AlgebraicValue::I8(*k), + HashU16 <=> UniqueHashU16 : |k: &u16, _: &AlgebraicType| AlgebraicValue::U16(*k), + HashI16 <=> UniqueHashI16 : |k: &i16, _: &AlgebraicType| AlgebraicValue::I16(*k), + HashU32 <=> UniqueHashU32 : |k: &u32, _: &AlgebraicType| AlgebraicValue::U32(*k), + HashI32 <=> UniqueHashI32 : |k: &i32, _: &AlgebraicType| AlgebraicValue::I32(*k), + HashU64 <=> UniqueHashU64 : |k: &u64, _: &AlgebraicType| AlgebraicValue::U64(*k), + HashI64 <=> UniqueHashI64 : |k: &i64, _: &AlgebraicType| AlgebraicValue::I64(*k), + HashU128 <=> UniqueHashU128 : |k: &u128, _: &AlgebraicType| AlgebraicValue::U128(Packed(*k)), + HashI128 <=> UniqueHashI128 : |k: &i128, _: &AlgebraicType| AlgebraicValue::I128(Packed(*k)), + HashU256 <=> UniqueHashU256 : |k: &u256, _: &AlgebraicType| AlgebraicValue::U256(Box::new(*k)), + HashI256 <=> UniqueHashI256 : |k: &i256, _: &AlgebraicType| AlgebraicValue::I256(Box::new(*k)), + HashF32 <=> UniqueHashF32 : |k: &F32, _: &AlgebraicType| AlgebraicValue::F32(*k), + HashF64 <=> UniqueHashF64 : |k: &F64, _: &AlgebraicType| AlgebraicValue::F64(*k), + HashString <=> UniqueHashString : |k: &str, _: &AlgebraicType| AlgebraicValue::String(k.into()), + HashAV <=> UniqueHashAV : |k: &AlgebraicValue, _: &AlgebraicType| k.clone(), + HashBytesKey8 <=> UniqueHashBytesKey8 + : |k: &BytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey8H(*k).into_algebraic_value(ty), + HashBytesKey24 <=> UniqueHashBytesKey24 + : |k: &BytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey24(*k).into_algebraic_value(ty), + HashBytesKey56 <=> UniqueHashBytesKey56 + : |k: &BytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey56(*k).into_algebraic_value(ty), + HashBytesKey120 <=> UniqueHashBytesKey120 + : |k: &BytesKey, ty: &AlgebraicType| + TypedIndexKey::BytesKey120(*k).into_algebraic_value(ty), + } + } + + /// Convert this non-unique index to a unique index in place. + /// + /// Returns `Ok(())` if the index was already unique or was successfully converted. + /// Returns `Err(ptr)` where `ptr` witnesses a duplicate key, leaving `self` unchanged. + fn make_unique(&mut self) -> Result<(), RowPointer> { + if self.is_unique() { + return Ok(()); + } + + let dummy = Self::BTreeBool(<_>::default()); + let old = core::mem::replace(self, dummy); + match old.try_make_unique() { + Ok(new) => { + *self = new; + Ok(()) + } + Err((restored, ptr)) => { + *self = restored; + Err(ptr) + } + } + } + + /// Convert this unique index back to a non-unique index in place. + /// + /// No-op for already non-unique or direct indices. + fn make_non_unique(&mut self) { + let dummy = Self::BTreeBool(<_>::default()); + let old = core::mem::replace(self, dummy); + *self = old.into_non_unique(); + } } /// A key into a [`TableIndex`]. @@ -2550,6 +2705,28 @@ impl TableIndex { } } + /// Convert this non-unique index to a unique index in place. + /// + /// Returns `Ok(())` if the index was already unique or was successfully converted. + /// Returns `Err(ptr)` where `ptr` witnesses a duplicate key, leaving `self` unchanged. + pub fn make_unique(&mut self) -> Result<(), RowPointer> { + self.idx.make_unique() + } + + /// Convert this unique index back to a non-unique index in place. + /// + /// No-op for already non-unique or direct indices. + pub fn make_non_unique(&mut self) { + self.idx.make_non_unique() + } + + /// Returns all duplicate keys (count > 1) in this index, + /// with keys converted to [`AlgebraicValue`] via the index's key type. + /// Returns an empty vec for unique indices. + pub fn iter_duplicates(&self) -> Vec<(AlgebraicValue, usize)> { + self.idx.iter_duplicates(&self.key_type) + } + /// Deletes all entries from the index, leaving it empty. /// /// When inserting a newly-created index into the committed state, diff --git a/crates/table/src/table_index/same_key_entry.rs b/crates/table/src/table_index/same_key_entry.rs index 3267a80002a..3d064917699 100644 --- a/crates/table/src/table_index/same_key_entry.rs +++ b/crates/table/src/table_index/same_key_entry.rs @@ -111,6 +111,14 @@ impl SameKeyEntry { } } + /// Returns the number of entries for this key. + pub(super) fn count(&self) -> usize { + match self { + Self::Small(list) => list.len(), + Self::Large(set) => set.len(), + } + } + /// Returns an iterator over all the entries for this key. pub(super) fn iter(&self) -> SameKeyEntryIter<'_> { match self { diff --git a/crates/table/src/table_index/unique_btree_index.rs b/crates/table/src/table_index/unique_btree_index.rs index e572f9e29d7..12b93b3b001 100644 --- a/crates/table/src/table_index/unique_btree_index.rs +++ b/crates/table/src/table_index/unique_btree_index.rs @@ -102,6 +102,36 @@ impl Index for UniqueBTreeIndex { } impl UniqueBTreeIndex { + /// Construct a `UniqueBTreeIndex` from a `BTreeMap`. + /// + /// Each entry is inserted via [`Index::insert`] so that `num_key_bytes` + /// is correctly maintained regardless of `K::MemoStorage`. + /// + /// # Panics + /// + /// Panics if the map contains duplicate keys (should never happen + /// since the caller verified uniqueness). + pub fn from_non_unique(map: BTreeMap) -> Self { + let mut result = Self::default(); + for (key, ptr) in map { + result.insert(key, ptr).expect("duplicate key in supposedly unique map"); + } + result + } + + /// Convert this unique index back into a non-unique `BTreeIndex`. + /// + /// This is lossless: each key maps to exactly one `RowPointer`, + /// which becomes a single-entry `SameKeyEntry` in the `BTreeIndex`. + pub fn into_non_unique(self) -> super::btree_index::BTreeIndex { + let mut mm = super::btree_index::BTreeIndex::default(); + for (key, ptr) in self.map { + // BTreeIndex::insert always succeeds. + let _ = as Index>::insert(&mut mm, key, ptr); + } + mm + } + /// See [`Index::delete`]. /// /// This version has relaxed bounds diff --git a/crates/table/src/table_index/unique_hash_index.rs b/crates/table/src/table_index/unique_hash_index.rs index 67b51bc7172..a6e774c4330 100644 --- a/crates/table/src/table_index/unique_hash_index.rs +++ b/crates/table/src/table_index/unique_hash_index.rs @@ -38,6 +38,39 @@ impl MemoryUsage for UniqueHashIndex { } } +impl UniqueHashIndex { + /// Construct a `UniqueHashIndex` from a `HashMap`. + /// + /// Each entry is inserted via [`Index::insert`] so that `num_key_bytes` + /// is correctly maintained regardless of `K::MemoStorage`. + /// + /// # Panics + /// + /// Panics if the map contains duplicate keys (should never happen + /// since the caller verified uniqueness). + pub fn from_non_unique(map: HashMap) -> Self { + let mut result = Self::default(); + for (key, ptr) in map { + result + .insert(key, ptr) + .expect("duplicate key in supposedly unique hash map"); + } + result + } + + /// Convert this unique hash index back into a non-unique `HashIndex`. + /// + /// This is lossless: each key maps to exactly one `RowPointer`. + pub fn into_non_unique(self) -> super::hash_index::HashIndex { + let mut hi = super::hash_index::HashIndex::default(); + for (key, ptr) in self.map { + // HashIndex::insert always succeeds. + let _ = as Index>::insert(&mut hi, key, ptr); + } + hi + } +} + impl Index for UniqueHashIndex { type Key = K;