Skip to content

Commit 9d9431d

Browse files
authored
[ISSUE #9254] Add CombineConsumeQueue to support CQ migration (#9256)
Add CombineConsumeQueue to support CQ migration
1 parent 921664f commit 9d9431d

33 files changed

+1888
-1245
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,6 @@ public boolean initializeMessageStore() {
824824
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
825825
} else {
826826
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
827-
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
828-
defaultMessageStore.enableRocksdbCQWrite();
829-
}
830827
}
831828

832829
if (messageStoreConfig.isEnableDLegerCommitLog()) {

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 8 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,12 @@
6969
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
7070
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
7171
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
72-
import org.apache.rocketmq.common.BoundaryType;
7372
import org.apache.rocketmq.common.BrokerConfig;
7473
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
7574
import org.apache.rocketmq.common.KeyBuilder;
7675
import org.apache.rocketmq.common.LockCallback;
7776
import org.apache.rocketmq.common.MQVersion;
7877
import org.apache.rocketmq.common.MixAll;
79-
import org.apache.rocketmq.common.Pair;
8078
import org.apache.rocketmq.common.TopicAttributes;
8179
import org.apache.rocketmq.common.TopicConfig;
8280
import org.apache.rocketmq.common.UnlockCallback;
@@ -214,13 +212,14 @@
214212
import org.apache.rocketmq.store.MessageStore;
215213
import org.apache.rocketmq.store.PutMessageResult;
216214
import org.apache.rocketmq.store.PutMessageStatus;
217-
import org.apache.rocketmq.store.RocksDBMessageStore;
218215
import org.apache.rocketmq.store.SelectMappedBufferResult;
219216
import org.apache.rocketmq.store.StoreType;
220217
import org.apache.rocketmq.store.config.BrokerRole;
221218
import org.apache.rocketmq.store.exception.ConsumeQueueException;
222219
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
220+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
223221
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
222+
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
224223
import org.apache.rocketmq.store.queue.CqUnit;
225224
import org.apache.rocketmq.store.queue.ReferredIterator;
226225
import org.apache.rocketmq.store.timer.TimerCheckpoint;
@@ -3321,129 +3320,24 @@ private boolean validateBlackListConfigExist(Properties properties) {
33213320
private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
33223321
RemotingCommand request) throws RemotingCommandException {
33233322
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
3324-
String requestTopic = requestHeader.getTopic();
33253323
MessageStore messageStore = brokerController.getMessageStore();
33263324
DefaultMessageStore defaultMessageStore;
33273325
if (messageStore instanceof AbstractPluginMessageStore) {
33283326
defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
33293327
} else {
33303328
defaultMessageStore = (DefaultMessageStore) messageStore;
33313329
}
3332-
RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
3333-
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
3330+
ConsumeQueueStoreInterface consumeQueueStore = defaultMessageStore.getQueueStore();
33343331

3335-
if (defaultMessageStore.getMessageStoreConfig().getStoreType().equals(StoreType.DEFAULT_ROCKSDB.getStoreType())) {
3336-
result.setCheckResult("storeType is DEFAULT_ROCKSDB, no need check");
3332+
if (!(consumeQueueStore instanceof CombineConsumeQueueStore)) {
3333+
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
3334+
result.setCheckResult("It is not CombineConsumeQueueStore, no need check");
33373335
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue());
33383336
return result;
33393337
}
33403338

3341-
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
3342-
result.setCheckResult("rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid");
3343-
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3344-
return result;
3345-
}
3346-
3347-
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
3348-
StringBuilder diffResult = new StringBuilder();
3349-
try {
3350-
if (StringUtils.isNotBlank(requestTopic)) {
3351-
boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
3352-
result.setCheckResult(diffResult.toString());
3353-
result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3354-
return result;
3355-
}
3356-
int successNum = 0;
3357-
int checkSize = 0;
3358-
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
3359-
boolean checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topicEntry.getKey(), rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
3360-
successNum += checkResult ? 1 : 0;
3361-
checkSize++;
3362-
}
3363-
// check all topic finish, all topic is ready, checkSize: 100, currentQueueNum: 110 -> ready (The currentQueueNum means when we do checking, new topics are added.)
3364-
// check all topic finish, success/all : 89/100, currentQueueNum: 110 -> not ready
3365-
boolean checkReady = successNum == checkSize;
3366-
String checkResultString = checkReady ? String.format("all topic is ready, checkSize: %s, currentQueueNum: %s", checkSize, cqTable.size()) :
3367-
String.format("success/all : %s/%s, currentQueueNum: %s", successNum, checkSize, cqTable.size());
3368-
diffResult.append("check all topic finish, ").append(checkResultString);
3369-
result.setCheckResult(diffResult.toString());
3370-
result.setCheckStatus(checkReady ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
3371-
} catch (Exception e) {
3372-
LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
3373-
result.setCheckResult(e.getMessage() + Arrays.toString(e.getStackTrace()));
3374-
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_ERROR.getValue());
3375-
}
3376-
return result;
3377-
}
3378-
3379-
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic,
3380-
RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail,
3381-
long checkpointByStoreTime) {
3382-
boolean processResult = true;
3383-
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
3384-
Integer queueId = queueEntry.getKey();
3385-
ConsumeQueueInterface jsonCq = queueEntry.getValue();
3386-
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);
3387-
if (printDetail) {
3388-
String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
3389-
topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
3390-
diffResult.append(format).append("\n");
3391-
}
3392-
3393-
long minOffsetByTime = 0L;
3394-
try {
3395-
minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER);
3396-
} catch (Exception e) {
3397-
// ignore
3398-
}
3399-
long minOffsetInQueue = kvCq.getMinOffsetInQueue();
3400-
long checkFrom = Math.max(minOffsetInQueue, minOffsetByTime);
3401-
long checkTo = jsonCq.getMaxOffsetInQueue() - 1;
3402-
/*
3403-
checkTo(maxOffsetInQueue - 1)
3404-
v
3405-
fileCq +------------------------------------------------------+
3406-
kvCq +----------------------------------------------+
3407-
^ ^
3408-
minOffsetInQueue minOffsetByTime
3409-
^
3410-
checkFrom = max(minOffsetInQueue, minOffsetByTime)
3411-
*/
3412-
// The latest message is earlier than the check time
3413-
Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(checkTo);
3414-
if (fileLatestCq != null) {
3415-
if (fileLatestCq.getObject2() < checkpointByStoreTime) {
3416-
continue;
3417-
}
3418-
}
3419-
for (long i = checkFrom; i <= checkTo; i++) {
3420-
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
3421-
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
3422-
if (fileCqUnit == null || kvCqUnit == null || !checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
3423-
LOGGER.error(String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n",
3424-
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
3425-
processResult = false;
3426-
break;
3427-
}
3428-
}
3429-
}
3430-
return processResult;
3431-
}
3432-
3433-
private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
3434-
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
3435-
return false;
3436-
}
3437-
if (cqUnit1.getSize() != cqUnit2.getSize()) {
3438-
return false;
3439-
}
3440-
if (cqUnit1.getPos() != cqUnit2.getPos()) {
3441-
return false;
3442-
}
3443-
if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
3444-
return false;
3445-
}
3446-
return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
3339+
return ((CombineConsumeQueueStore) consumeQueueStore).
3340+
doCheckCqWriteProgress(requestHeader.getTopic(), requestHeader.getCheckStoreTime(), StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB);
34473341
}
34483342

34493343
private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) {

broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public boolean correctDelayOffset() {
236236
try {
237237
for (int delayLevel : delayLevelTable.keySet()) {
238238
ConsumeQueueInterface cq =
239-
brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
239+
brokerController.getMessageStore().findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
240240
delayLevel2QueueId(delayLevel));
241241
Long currentDelayOffset = offsetTable.get(delayLevel);
242242
if (currentDelayOffset == null || cq == null) {

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@
2828
import org.apache.rocketmq.broker.BrokerController;
2929
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
3030
import org.apache.rocketmq.common.BrokerConfig;
31+
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
3132
import org.apache.rocketmq.common.Pair;
32-
import org.apache.rocketmq.common.TopicConfig;
3333
import org.apache.rocketmq.store.DefaultMessageStore;
3434
import org.apache.rocketmq.store.DispatchRequest;
35-
import org.apache.rocketmq.store.RocksDBMessageStore;
35+
import org.apache.rocketmq.store.StoreType;
3636
import org.apache.rocketmq.store.config.MessageStoreConfig;
37+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
3738
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
39+
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
3840
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
3941
import org.apache.rocketmq.store.queue.CqUnit;
42+
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
4043
import org.apache.rocketmq.store.stats.BrokerStatsManager;
4144
import org.awaitility.Awaitility;
4245
import org.junit.Assert;
@@ -81,9 +84,8 @@ public void init() throws IOException {
8184
Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
8285
Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
8386

84-
defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("aaa", true), null,
85-
brokerConfig, new ConcurrentHashMap<String, TopicConfig>());
86-
defaultMessageStore.enableRocksdbCQWrite();
87+
defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("for-test", true), null,
88+
brokerConfig, new ConcurrentHashMap<>());
8789
defaultMessageStore.loadCheckPoint();
8890

8991
consumerOffsetManager = new ConsumerOffsetManager(brokerController);
@@ -134,23 +136,35 @@ public void testRocksdbCqWrite() throws RocksDBException {
134136
if (notToBeExecuted()) {
135137
return;
136138
}
137-
RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore();
138-
ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
139-
store.start();
140-
ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
141-
ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId);
139+
long startTimestamp = System.currentTimeMillis();
140+
141+
ConsumeQueueStoreInterface combineConsumeQueueStore = defaultMessageStore.getQueueStore();
142+
Assert.assertTrue(combineConsumeQueueStore instanceof CombineConsumeQueueStore);
143+
combineConsumeQueueStore.load();
144+
combineConsumeQueueStore.recover(false);
145+
combineConsumeQueueStore.start();
146+
147+
RocksDBConsumeQueueStore rocksDBConsumeQueueStore = ((CombineConsumeQueueStore) combineConsumeQueueStore).getRocksDBConsumeQueueStore();
148+
ConsumeQueueStore consumeQueueStore = ((CombineConsumeQueueStore) combineConsumeQueueStore).getConsumeQueueStore();
149+
142150
for (int i = 0; i < 200; i++) {
143151
DispatchRequest request = new DispatchRequest(topic, queueId, i, 200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>());
144-
fileCq.putMessagePositionInfoWrapper(request);
145-
store.putMessagePositionInfoWrapper(request);
152+
combineConsumeQueueStore.putMessagePositionInfoWrapper(request);
146153
}
154+
155+
ConsumeQueueInterface rocksdbCq = rocksDBConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
156+
ConsumeQueueInterface fileCq = consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
157+
147158
Awaitility.await()
148159
.pollInterval(100, TimeUnit.MILLISECONDS)
149160
.atMost(3, TimeUnit.SECONDS)
150161
.until(() -> rocksdbCq.getMaxOffsetInQueue() == 200);
151162
Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
152163
Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
153164
Assert.assertEquals(unit.getObject1().getPos(), unit1.getObject1().getPos());
165+
166+
CheckRocksdbCqWriteResult result = ((CombineConsumeQueueStore) combineConsumeQueueStore).doCheckCqWriteProgress(topic, startTimestamp, StoreType.DEFAULT, StoreType.DEFAULT_ROCKSDB);
167+
Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus());
154168
}
155169

156170
/**

0 commit comments

Comments
 (0)