diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java new file mode 100644 index 0000000000000..e93856cacd1e5 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.partition; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class DataPartitionTableIntegrityCheckProcedureIT { + private static final Logger LOGGER = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedureIT.class); + + @Before + public void setUp() { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS) + .setDataReplicationFactor(1); + EnvFactory.getEnv().initClusterEnvironment(1, 1); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure() + throws InterruptedException { + final int threadCount = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch finishLatch = new CountDownLatch(threadCount); + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + final AtomicInteger successCount = new AtomicInteger(0); + final AtomicInteger failCount = new AtomicInteger(0); + final List failureMessages = Collections.synchronizedList(new ArrayList<>()); + + // Concurrently submit the DataPartitionTableIntegrityCheckProcedure + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + executor.submit( + () -> { + try { + startLatch.await(); + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement stmt = connection.createStatement()) { + stmt.execute("REPAIR DATA PARTITION TABLE"); + successCount.incrementAndGet(); + LOGGER.info("Thread {} submitted integrity check successfully", threadId); + } + } catch (final SQLException e) { + failCount.incrementAndGet(); + failureMessages.add("Thread " + threadId + " failed: " + e.getMessage()); + LOGGER.info( + "Thread {} failed to submit integrity check: {}", threadId, e.getMessage()); + } catch (final Exception e) { + failCount.incrementAndGet(); + failureMessages.add("Thread " + threadId + " failed unexpectedly: " + e.getMessage()); + LOGGER.error("Thread {} unexpected error: {}", threadId, e.getMessage(), e); + } finally { + finishLatch.countDown(); + } + }); + } + + startLatch.countDown(); + + final boolean completed = finishLatch.await(60, TimeUnit.SECONDS); + Assert.assertTrue("Not all threads completed within timeout", completed); + + executor.shutdown(); + Assert.assertTrue( + "Executor did not terminate", executor.awaitTermination(10, TimeUnit.SECONDS)); + + LOGGER.info("Success count: {}, Fail count: {}", successCount.get(), failCount.get()); + LOGGER.info("Failure messages: {}", failureMessages); + + Assert.assertEquals( + "Only one procedure should be submitted successfully", 1, successCount.get()); + Assert.assertEquals( + "The other concurrent submissions should be rejected", threadCount - 1, failCount.get()); + } +} diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 31d1007867a50..c6271c134cb00 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -94,6 +94,7 @@ utilityStatement | showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser + | repairDataPartitionTable ; /** @@ -1238,6 +1239,11 @@ stopRepairData : STOP REPAIR DATA (ON (LOCAL | CLUSTER))? ; +// Repair Data Partition Table +repairDataPartitionTable + : REPAIR DATA PARTITION TABLE + ; + // Explain explain : EXPLAIN (ANALYZE VERBOSE?)? selectStatement? diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 59b318a4b11e9..3abb322d08472 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -319,8 +319,6 @@ public class ConfigNodeConfig { private long forceWalPeriodForConfigNodeSimpleInMs = 100; - private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000; - public ConfigNodeConfig() { // empty constructor } @@ -1288,13 +1286,4 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } - - public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() { - return partitionTableRecoverWaitAllDnUpTimeoutInMs; - } - - public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs( - long partitionTableRecoverWaitAllDnUpTimeoutInMs) { - this.partitionTableRecoverWaitAllDnUpTimeoutInMs = partitionTableRecoverWaitAllDnUpTimeoutInMs; - } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index dd32415ebe08d..77790dae1a903 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,23 +322,6 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); - long partitionTableRecoverWaitAllDnUpTimeoutInMs = - Long.parseLong( - properties.getProperty( - "partition_table_recover_wait_all_dn_up_timeout_ms", - String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs()))); - if (partitionTableRecoverWaitAllDnUpTimeoutInMs <= 0) { - LOGGER.warn( - "partition_table_recover_wait_all_dn_up_timeout_ms should be greater than 0, " - + "but current value is {}, ignore that and use the default value {}", - partitionTableRecoverWaitAllDnUpTimeoutInMs, - conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs()); - partitionTableRecoverWaitAllDnUpTimeoutInMs = - conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs(); - } - conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs( - partitionTableRecoverWaitAllDnUpTimeoutInMs); - String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 182dc2f9fb249..41c5066734014 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1155,6 +1155,16 @@ public TDataPartitionTableResp getOrCreateDataPartition( return resp; } + @Override + public TSStatus dataPartitionTableIntegrityCheck() { + TSStatus status = confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + return partitionManager.dataPartitionTableIntegrityCheck(); + } + private void printNewCreatedDataPartition( GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) { final String lineSeparator = System.lineSeparator(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 02c82164595df..2fbf7271d08d0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -475,6 +475,8 @@ TSchemaNodeManagementResp getNodePathsPartition( TDataPartitionTableResp getOrCreateDataPartition( GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan); + TSStatus dataPartitionTableIntegrityCheck(); + /** * Get AuditLogger. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2c5a77303d9b9..49dc40002ff76 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -68,7 +68,6 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; -import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -1376,16 +1375,6 @@ public TSStatus createRegionGroups( } } - /** Used to repair the lost data partition table */ - public TSStatus dataPartitionTableIntegrityCheck() { - DataPartitionTableIntegrityCheckProcedure procedure; - synchronized (this) { - procedure = new DataPartitionTableIntegrityCheckProcedure(); - executor.submitProcedure(procedure); - } - return waitingProcedureFinished(procedure, 86400000); - } - /** * Generate {@link CreateTriggerProcedure} and wait until it finished. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 576d805c78624..fd51a5931025a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -81,6 +81,7 @@ import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -152,6 +153,9 @@ public class PartitionManager { private final ScheduledExecutorService regionMaintainer; private Future currentRegionMaintainerFuture; + private final AtomicBoolean dataPartitionTableIntegrityCheckProcedureRunning = + new AtomicBoolean(false); + public PartitionManager(IManager configManager, PartitionInfo partitionInfo) { this.configManager = configManager; this.partitionInfo = partitionInfo; @@ -511,6 +515,29 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition return resp; } + /** Used to repair the lost data partition table */ + public TSStatus dataPartitionTableIntegrityCheck() { + if (configManager + .getProcedureManager() + .isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class) + || !dataPartitionTableIntegrityCheckProcedureRunning.compareAndSet(false, true)) { + return RpcUtils.getStatus( + TSStatusCode.OVERLAP_WITH_EXISTING_TASK, + "DataPartitionTableIntegrityCheckProcedure is already submitted."); + } + + synchronized (this) { + DataPartitionTableIntegrityCheckProcedure procedure = + new DataPartitionTableIntegrityCheckProcedure(); + getProcedureManager().getExecutor().submitProcedure(procedure); + } + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + public void markDataPartitionTableIntegrityCheckProcedureFinished() { + dataPartitionTableIntegrityCheckProcedureRunning.set(false); + } + private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { TSStatus status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index 4f2c6933fd8fc..f3d539576d4ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -131,6 +131,15 @@ public DataPartitionTableIntegrityCheckProcedure() { super(); } + @Override + protected void updateMetricsOnFinish( + final ConfigNodeProcedureEnv env, final long runtime, final boolean success) { + super.updateMetricsOnFinish(env, runtime, success); + env.getConfigManager() + .getPartitionManager() + .markDataPartitionTableIntegrityCheckProcedureFinished(); + } + @Override protected Flow executeFromState( final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 4d01f3770c218..dd18802cf8525 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -624,6 +624,11 @@ public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq r return configManager.getOrCreateDataPartition(getOrCreateDataPartitionReq); } + @Override + public TSStatus dataPartitionTableIntegrityCheck() { + return configManager.dataPartitionTableIntegrityCheck(); + } + @Override public TSStatus operatePermission(final TAuthorizerReq req) { ConfigPhysicalPlanType configPhysicalPlanType = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index e2c04caedfb20..06f60b0442538 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -700,6 +700,12 @@ public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq r resp -> !updateConfigNodeLeader(resp.status)); } + @Override + public TSStatus dataPartitionTableIntegrityCheck() throws TException { + return executeRemoteCallWithRetry( + () -> client.dataPartitionTableIntegrityCheck(), status -> !updateConfigNodeLeader(status)); + } + @Override public TSStatus operatePermission(TAuthorizerReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 4e33c8240b449..82928518abdff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -105,6 +105,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.KillQueryTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.LoadConfigurationTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.MergeTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.RepairDataPartitionTableTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetConfigurationTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetSystemStatusTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask; @@ -213,6 +214,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; @@ -384,6 +386,12 @@ public IConfigTask visitStartRepairData( return new StartRepairDataTask(startRepairDataStatement); } + @Override + public IConfigTask visitRepairDataPartitionTable( + RepairDataPartitionTable repairDataPartitionTable, MPPQueryContext context) { + return new RepairDataPartitionTableTask(); + } + @Override public IConfigTask visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 5325560301c64..c1894f97a873d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1457,6 +1457,27 @@ public SettableFuture stopRepairData(boolean onCluster) { return future; } + @Override + public SettableFuture repairDataPartitionTable() { + SettableFuture future = SettableFuture.create(); + TSStatus tsStatus = new TSStatus(); + + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + // Send request to ConfigNode to trigger DataPartitionTableIntegrityCheckProcedure + tsStatus = client.dataPartitionTableIntegrityCheck(); + } catch (ClientManagerException | TException e) { + future.setException(e); + } + + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + future.setException(new IoTDBException(tsStatus)); + } + return future; + } + @Override public SettableFuture loadConfiguration(boolean onCluster) { SettableFuture future = SettableFuture.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 9e02ba6cff7f4..4434cbb38a9f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -155,6 +155,8 @@ SettableFuture showPipePlugins( SettableFuture stopRepairData(boolean onCluster); + SettableFuture repairDataPartitionTable(); + SettableFuture flush(TFlushReq tFlushReq, boolean onCluster); SettableFuture clearCache(boolean onCluster, Set options); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java new file mode 100644 index 0000000000000..f3675e9a0d970 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; + +import com.google.common.util.concurrent.ListenableFuture; + +public class RepairDataPartitionTableTask implements IConfigTask { + + public RepairDataPartitionTableTask() {} + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.repairDataPartitionTable(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index c7a4acfcd59c8..cb29b4d5f7339 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -238,6 +238,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; @@ -3749,6 +3750,14 @@ public Statement visitStartRepairData(IoTDBSqlParser.StartRepairDataContext ctx) return startRepairDataStatement; } + // Repair Data Partition Table + + @Override + public Statement visitRepairDataPartitionTable( + IoTDBSqlParser.RepairDataPartitionTableContext ctx) { + return new RepairDataPartitionTable(); + } + // Stop Repair Data @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index 95bb70872ff02..9f8e778726f52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.security; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.audit.AuditEventType; import org.apache.iotdb.commons.audit.AuditLogOperation; import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.auth.AuthException; @@ -147,6 +148,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; @@ -188,8 +190,8 @@ public class TreeAccessCheckVisitor extends StatementVisitor ""); } + @Override + public TSStatus visitRepairDataPartitionTable( + RepairDataPartitionTable repairDataPartitionTable, TreeAccessCheckContext context) { + return checkGlobalAuth( + context.setAuditLogOperation(AuditLogOperation.CONTROL), + PrivilegeType.SYSTEM, + AuditEventType.INTEGRITY_CHECK); + } + @Override public TSStatus visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, TreeAccessCheckContext context) { @@ -2029,11 +2040,34 @@ protected TSStatus checkGlobalAuth( return result; } + protected TSStatus checkGlobalAuth( + IAuditEntity context, PrivilegeType requiredPrivilege, AuditEventType auditEventType) { + if (checkHasGlobalAuth(context, requiredPrivilege)) { + return SUCCEED; + } + TSStatus result = AuthorityChecker.getTSStatus(false, requiredPrivilege); + IAuditEntity auditEntity = + context.setResult(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()); + AUDIT_LOGGER.log( + auditEntity.setAuditEventType(auditEventType), + () -> + String.format( + OPERATION_AUDIT_STR, + auditEntity.getUsername(), + auditEntity.getUserId(), + auditEventType)); + return result; + } + protected boolean checkHasGlobalAuth( IAuditEntity context, PrivilegeType requiredPrivilege, Supplier auditObject) { return checkHasGlobalAuth(context, requiredPrivilege, auditObject, false); } + protected boolean checkHasGlobalAuth(IAuditEntity context, PrivilegeType requiredPrivilege) { + return checkHasGlobalAuth(context, requiredPrivilege, () -> "", false); + } + protected boolean checkHasGlobalAuth( IAuditEntity context, PrivilegeType requiredPrivilege, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index 19e597ef03197..b40c6444816fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -181,6 +181,7 @@ public enum StatementType { PIPE_ENRICHED, START_REPAIR_DATA, STOP_REPAIR_DATA, + REPAIR_DATA_PARTITION_TABLE, CREATE_TOPIC, DROP_TOPIC, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 3617c4bae8d56..847e850c52172 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -138,6 +138,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement; @@ -520,6 +521,11 @@ public R visitStopRepairData(StopRepairDataStatement stopRepairDataStatement, C return visitStatement(stopRepairDataStatement, context); } + public R visitRepairDataPartitionTable( + RepairDataPartitionTable repairDataPartitionTable, C context) { + return visitStatement(repairDataPartitionTable, context); + } + public R visitLoadConfiguration( LoadConfigurationStatement loadConfigurationStatement, C context) { return visitStatement(loadConfigurationStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java new file mode 100644 index 0000000000000..b377952ac5b05 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.sys; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; + +public class RepairDataPartitionTable extends Statement implements IConfigStatement { + + public RepairDataPartitionTable() { + this.statementType = StatementType.REPAIR_DATA_PARTITION_TABLE; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } + + @Override + public QueryType getQueryType() { + return QueryType.OTHER; + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitRepairDataPartitionTable(this, context); + } +} diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 378a6226cbffd..87be30f4520e0 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -739,12 +739,6 @@ partition_table_recover_worker_num=10 # Datatype: Integer partition_table_recover_max_read_megabytes_per_second=10 -# Purpose: for data partition repair -# Set a timeout to wait for all datanodes complete startup, the unit is ms -# effectiveMode: restart -# Datatype: Integer -partition_table_recover_wait_all_dn_up_timeout_ms=60000 - #################### ### Memory Control Configuration #################### diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 92312ee81a307..a4a80c5395385 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1487,6 +1487,8 @@ service IConfigNodeRPCService { */ TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req) + common.TSStatus dataPartitionTableIntegrityCheck() + // ====================================================== // Authorize // ======================================================