diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 031cfd3a62e0..06d6a512b30a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -377,7 +377,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 73b543592c1e..99d452987943 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -130,6 +130,10 @@ protected boolean tryReadLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) { + return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseReadLock() { pipeMetaKeeper.releaseReadLock(); } @@ -148,10 +152,18 @@ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) { + return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseWriteLock() { pipeMetaKeeper.releaseWriteLock(); } + private long convertMsToCeilSeconds(final long timeOutInMs) { + return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L); + } + ////////////////////////// Pipe Task Management Entry ////////////////////////// public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( @@ -368,7 +380,7 @@ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String public List handlePipeMetaChanges( final List pipeMetaListFromCoordinator) { - if (!tryWriteLockWithTimeOut( + if (!tryWriteLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return null; } @@ -1107,7 +1119,7 @@ private void stopAllPipesWithCriticalExceptionInternal(final int currentNodeId) public void collectPipeMetaList(final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; }