Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it.triple.regression.param;

import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;

import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionMisc.class})
public class IoTDBEncryptedPasswordPullConsumerIT extends AbstractSubscriptionRegressionIT {

private static final String DATABASE = "root.TestEncryptedPasswordPullConsumer";
private static final String DEVICE = DATABASE + ".d_0";
private static final String TOPIC_NAME = "TestEncryptedPasswordPullConsumerTopic";
private static final String USERNAME = "encrypted_user";
private static final String PASSWORD = "EncryptedUser@123";
private static final String ENCRYPTED_PASSWORD = AuthUtils.encryptPassword(PASSWORD);
private static final String WRONG_ENCRYPTED_PASSWORD =
AuthUtils.encryptPassword("WrongEncryptedUser@123");

private static final List<MeasurementSchema> SCHEMA_LIST = new ArrayList<>();

static {
SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64));
SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
}

private SubscriptionPullConsumer consumer;

@Override
@Before
public void setUp() throws Exception {
super.setUp();
createDB(DATABASE);
createTopic_s(TOPIC_NAME, "root.**", null, null, false);
session_src.createTimeseries(
DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4);
session_src.createTimeseries(
DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4);
session_dest.createTimeseries(
DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4);
session_dest.createTimeseries(
DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4);
session_src.executeNonQueryStatement("create user " + USERNAME + " '" + PASSWORD + "'");
session_src.executeNonQueryStatement("grant read,write on root.** to user " + USERNAME);
Assert.assertTrue(subs.getTopic(TOPIC_NAME).isPresent());
}

@Override
@After
public void tearDown() throws Exception {
try {
if (consumer != null) {
consumer.close();
}
} catch (final Exception ignored) {
}
try {
subs.dropTopic(TOPIC_NAME);
} catch (final Exception ignored) {
}
try {
session_src.executeNonQueryStatement("drop user " + USERNAME);
} catch (final Exception ignored) {
}
dropDB(DATABASE);
super.tearDown();
}

@Test
public void testSubscribeWithEncryptedPassword()
throws TException,
IoTDBConnectionException,
IOException,
StatementExecutionException,
InterruptedException {
consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD);

consumer.open();
consumer.subscribe(TOPIC_NAME);
Assert.assertEquals(1, subs.getSubscriptions().size());

insertData(1706659200000L);
consume_data(consumer, session_dest);
check_count(
4,
"select count(s_0) from " + DEVICE + " where time >= 1706659200000",
"encrypted password consumption");
}

@Test
public void testSubscribeFailsWithWrongEncryptedPassword()
throws IoTDBConnectionException, StatementExecutionException {
consumer = createConsumer("wrong-encrypted-password-group", WRONG_ENCRYPTED_PASSWORD);

try {
consumer.open();
consumer.subscribe(TOPIC_NAME);
Assert.fail("subscribe should fail when encrypted password mismatches");
} catch (final Exception ignored) {
Assert.assertTrue(subs.getSubscriptions().isEmpty());
}
}

private SubscriptionPullConsumer createConsumer(
final String consumerGroupId, final String encryptedPassword) {
return new SubscriptionPullConsumer.Builder()
.host(SRC_HOST)
.port(SRC_PORT)
.username(USERNAME)
.encryptedPassword(encryptedPassword)
.consumerId("consumer_" + consumerGroupId)
.consumerGroupId(consumerGroupId)
.buildPullConsumer();
}

private void insertData(long timestamp)
throws IoTDBConnectionException, StatementExecutionException {
final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10);
for (int row = 0; row < 5; row++) {
final int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue("s_0", rowIndex, row * 20L + row);
tablet.addValue("s_1", rowIndex, row + 2.45);
timestamp += row * 2000;
}
session_src.insertTablet(tablet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public String getConsumerGroupId() {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}

public String getUsername() {
return getString(ConsumerConstant.USERNAME_KEY);
}

public String getPassword() {
return getString(ConsumerConstant.PASSWORD_KEY);
}

public String getEncryptedPassword() {
return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY);
}

public long getHeartbeatIntervalMs() {
return getLongOrDefault(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ConsumerConstant {

public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";
public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password";

public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ abstract class SubscriptionConsumer implements AutoCloseable {

private final String username;
private final String password;
private final String encryptedPassword;

protected String consumerId;
protected String consumerGroupId;
Expand Down Expand Up @@ -177,6 +178,7 @@ protected SubscriptionConsumer(final Builder builder) {

this.username = builder.username;
this.password = builder.password;
this.encryptedPassword = builder.encryptedPassword;

this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
Expand Down Expand Up @@ -206,6 +208,7 @@ protected SubscriptionConsumer(final Builder builder, final Properties propertie
(String)
properties.getOrDefault(
ConsumerConstant.PASSWORD_KEY, SessionConfig.DEFAULT_PASSWORD))
.encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
.consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY))
.consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
.heartbeatIntervalMs(
Expand Down Expand Up @@ -386,6 +389,7 @@ SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
endPoint,
this.username,
this.password,
this.encryptedPassword,
this.consumerId,
this.consumerGroupId,
this.thriftMaxFrameSize,
Expand Down Expand Up @@ -1401,6 +1405,7 @@ public abstract static class Builder {

protected String username = SessionConfig.DEFAULT_USER;
protected String password = SessionConfig.DEFAULT_PASSWORD;
protected String encryptedPassword;

protected String consumerId;
protected String consumerGroupId;
Expand Down Expand Up @@ -1437,10 +1442,27 @@ public Builder username(final String username) {
}

public Builder password(final String password) {
if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD)
&& Objects.nonNull(this.encryptedPassword)) {
throw new IllegalStateException(
"password and encryptedPassword are mutually exclusive; encryptedPassword is already set");
}
this.password = password;
return this;
}

public Builder encryptedPassword(final String encryptedPassword) {
if (Objects.isNull(encryptedPassword)) {
return this;
}
if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) {
throw new IllegalStateException(
"password and encryptedPassword are mutually exclusive; password is already set");
}
this.encryptedPassword = encryptedPassword;
return this;
}

public Builder consumerId(@Nullable final String consumerId) {
if (Objects.isNull(consumerId)) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@ final class SubscriptionProvider extends SubscriptionSession {
private final long heartbeatIntervalMs;
private final int connectionTimeoutInMs;
private int dataNodeId;
private final String username;
private final String password;
private final String encryptedPassword;

SubscriptionProvider(
final TEndPoint endPoint,
final String username,
final String password,
final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
Expand All @@ -101,6 +105,9 @@ final class SubscriptionProvider extends SubscriptionSession {
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
this.username = username;
this.password = password;
this.encryptedPassword = encryptedPassword;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
Expand Down Expand Up @@ -149,6 +156,11 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
if (encryptedPassword != null) {
consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY, encryptedPassword);
}
consumerAttributes.put(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs));
consumerAttributes.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ public Builder password(final String password) {
return this;
}

@Override
public Builder encryptedPassword(final String encryptedPassword) {
super.encryptedPassword(encryptedPassword);
return this;
}

@Override
public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ public Builder password(final String password) {
return this;
}

@Override
public Builder encryptedPassword(final String encryptedPassword) {
super.encryptedPassword(encryptedPassword);
return this;
}

@Override
public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public TPermissionInfoResp login(String username, String password) {
return authorInfo.login(username, password);
}

public TPermissionInfoResp login(
final String username, final String password, final boolean useEncryptedPassword) {
return authorInfo.login(username, password, useEncryptedPassword);
}

public String login4Pipe(final String userName, final String password) {
return authorInfo.login4Pipe(userName, password);
}

public TPermissionInfoResp checkUserPrivileges(
String username, List<PartialPath> paths, int permission) {
return authorInfo.checkUserPrivileges(username, paths, permission);
Expand Down
Loading
Loading