Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException(
///////////////////////// Heartbeat /////////////////////////

public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException {
if (!tryReadLockWithTimeOut(
if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ protected boolean tryReadLockWithTimeOut(final long timeOutInSeconds) {
}
}

protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
}

protected void releaseReadLock() {
pipeMetaKeeper.releaseReadLock();
}
Expand All @@ -148,10 +152,18 @@ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) {
}
}

protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
}

protected void releaseWriteLock() {
pipeMetaKeeper.releaseWriteLock();
}

private long convertMsToCeilSeconds(final long timeOutInMs) {
return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
}

////////////////////////// Pipe Task Management Entry //////////////////////////

public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
Expand Down Expand Up @@ -368,7 +380,7 @@ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String

public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
final List<PipeMeta> pipeMetaListFromCoordinator) {
if (!tryWriteLockWithTimeOut(
if (!tryWriteLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) {
return null;
}
Expand Down Expand Up @@ -1107,7 +1119,7 @@ private void stopAllPipesWithCriticalExceptionInternal(final int currentNodeId)

public void collectPipeMetaList(final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp)
throws TException {
if (!tryReadLockWithTimeOut(
if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) {
return;
}
Expand Down
Loading