欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

十:RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-下篇

上篇主要是讲解 RocketMQ 运行过程中消息发送者发送一条消息,进入到 commitlog 文件,然后是如何被转发到 consumequeueindex 索引文件中的,本节主要剖析一下,在 RocketMQ 启动过程中,是如何根据 commitlog 重构 consumeque,index 的,因为毕竟 commitlog 文件中的消息与 consumequeue 中的文件内容并不能确保是一致的。

入口:DefaultMessageStore#load

/**
     * @throws IOException
     */
    public boolean load() {
        boolean result = true;

        try {
            boolean lastExitOK = !this.isTempFileExist();  // @1
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();    // @2
            }

            // load Commit Log
            result = result && this.commitLog.load();                             // @3

            // load Consume Queue
            result = result && this.loadConsumeQueue();                    // @4

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));   // @5

                this.indexService.load(lastExitOK);    // @6

                this.recover(lastExitOK);      // @7

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;

代码@1:判断 $\{ROCKET\_HOME\}/storepath/abort 文件是否存在,如果文件存在,则返回 true,否则返回 false,这个文件的作用是什么呢?原来,在 DefaultMessageStore 启动时创建,在 shutdown 时删除,也就是如果该文件存在,说明不是正常的关闭。

private boolean isTempFileExist() {
        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(fileName);
        return file.exists();

代码@2:延迟消息启动。

代码@3:commitlog 文件加载。

代码@4:加载 consumerqueue 文件。

代码@5:文件存储检测点。

代码@6:索引文件加载。

代码@7:文件检测恢复。

接下来,本文重点分析步骤3-6都是基于物理磁盘上的文件,构建成内存映射文件(MappedFile)。

代码@7:验证 commitlogconsumequeue、索引文件直接的一致性检测,也是本文重点分析内容。

1、文件恢复

DefaultMessageStore#recover

private void recover(final boolean lastExitOK) {
        this.recoverConsumeQueue();   // @1

        if (lastExitOK) {   // @2
            this.commitLog.recoverNormally();  // @21
        } else {
            this.commitLog.recoverAbnormally(); // @22
        }

        this.recoverTopicQueueTable();   // @3

代码@1:恢复消息队列。

代码@2:如果是正常退出,则按照正常修复;如果是异常退出,则走异常修复逻辑。

代码@3,修复主题队列。

1.1 消息队列恢复 DefaultMessageStore#recoverConsumeQueue

img_0914_01_1.png

ConsumeQueue\#recover

public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();    // @1
        if (!mappedFiles.isEmpty()) {

            int index = mappedFiles.size() - 3;        
            if (index < 0)                                                 // @2
                index = 0;

            int mappedFileSizeLogics = this.mappedFileSize;                // @3 start
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();       // @3 end
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {  // 
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {  // 4 start
                    long offset = byteBuffer.getLong();                      // @5 start
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();               // @5 end

                    if (offset >= 0 && size > 0) {
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        this.maxPhysicOffset = offset;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                } // @4 end

                if (mappedFileOffset == mappedFileSizeLogics) { // @6
                    index++;
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last maped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }

            processOffset += mappedFileOffset;       // @7
            this.mappedFileQueue.setFlushedWhere(processOffset);  // @8
            this.mappedFileQueue.setCommittedWhere(processOffset);  // @9
            this.mappedFileQueue.truncateDirtyFiles(processOffset);   // @10

            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        } 

代码@1:获取该消息队列的所有内存映射文件。

代码@2:只从倒数第3个文件开始,这应该是一个经验值。

代码@3 :首先介绍几个局部变量。

 *  mappedFileSizeLogics  
    consumequeue 逻辑大小。
 *  mappedFile  
    queue对应的内存映射文件。
 *  byteBuffer  
    内存映射文件对应的ByteBuffer
 *  processOffset :  
    处理的 offset,默认从 consumequeue 中存放的第一个条目开始。

代码@4:循环验证 consumeque 包含条目的有效性(如果 offset 大于0并且 size 大于0,则表示是一个有效的条目)

代码@5:读取一个条目的内容。

  • offset :commitlog中的物理偏移量
  • size : 该条消息的消息总长度
  • tagsCode :tag hashcode

如果 offset 大于0并且 size 大于0,则表示是一个有效的条目,设置 consumequeue 中有效的 mappedFileOffset ,继续下一个条目的验证,如果发现不正常的条目,则跳出循环。

代码@6:如果该 consumeque 文件中所有条目全部有效,则继续验证下一个文件,(index++),如果发现条目不合法,后面的文件不需要再检测。

代码@7,:processOffset 代表了当前 consuemque 有效的偏移量。

代码8,@9:设置 flushedWherecommittedWhere 为当前有效的偏移量。

代码@10:截断无效的consumeque文件。

public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {  // @1
                if (offset >= file.getFileFromOffset()) {
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    file.destroy(1000);    // @2
                    willRemoveFiles.add(file); 
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);  // @3

该方法主要就是再次遍历所有的 MappedFile,如果无效的 offset 大于 该 consumeque,则无需处理。

如果无效的 offset 小于该文件最大的偏移量,如果 consumequeue 的 offset 大于失效的 offset,则该文件整个删除,如果否,则设置 wrotePosition,commitedPosition,flushedPoisition 的值即可。

由此可见,DefaultMessageStore\#recoverConsumeQueue 主要要做的就是先移除非法的offset。

下面代码摘录自:DefaultMessageStore\#recover

if (lastExitOK) {
            this.commitLog.recoverNormally();
        } else {
            this.commitLog.recoverAbnormally();

lastExitOk 为 true,表示 abort 文件不存在,表示是正常退出,如果 abort 文件存在,则表示异常退出,

1.2 commitlog正常恢复

CommitLog\#recoverNormally commitlog 正常恢复与 ConsumeQueue 的恢复差不多的逻辑,就不重复跟踪。

1.3 commitlog异常恢复

CommitLog\#recoverAbnormally

public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {                                                                                   // @1
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this maped file " + mappedFile.getFileName());
                    break;
                }
            }

            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);  // @2
                int size = dispatchRequest.getMsgSize();

                // Normal data
                if (size > 0) {
                    mappedFileOffset += size;

                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                // Intermediate file read error
                else if (size == -1) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
                // Come the end of the file, switch to the next file
                // Since the return 0 representatives met last hole, this can
                // not be included in truncate offset
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last maped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
        // Commitlog case files are deleted
        else {
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }

代码@1:从最后一个文件开始检测,先找到第一个正常的 commitlog 文件,然后从该文件开始去恢复。查找文件的逻辑:

img_0914_01_2.png

判断一个文件是否正常,主要是如下几个条件:

1、魔数正确

2、消息存储时间不为0

3、存储时间小于等于检测点(checkpoint)

代码@2:创建转发对象,该方法在消费队列,Index 索引文件存储上篇已分析过。

代码@3:转发给对象,会同步更新 consumequeue,index 文件

代码@4:检测到非法的 commitlog 时,停止恢复,然后设置 commitlog 的检测点,然后删除多于的不符合格式的文件。

1.4 recoverTopicQueueTable

HashMap<String/\* topic-queueid \*/, Long/\* offset \*/> topicQueueTable 恢复 Commitlog 中队列的最大偏移量。

2、总结

rocketmq 关于 commitlog 文件、消息消费 ConsumeQueue 队列、索引文件 Index 的恢复机制就介绍到这,下面我再来总结一下 RocketMQ recover 机制。

1、首先先加载相关文件到内存(内存映射文件)

包含 Commitlog 文件、ConsumeQueue 文件、存储检测点(CheckPoint)文件、索引文件。

2、执行文件恢复

引入临时文件 abort 来区分是否是异常启动。在存储管理启动时(DefaultMessageStore)创建 abort 文件,结束时(shutdown)会删除 abort 文件,也就是如果在启动的时候,如果发现存储在该临时文件,则认为是异常。

恢复顺序:

1、先恢复 consumeque 文件,把不符合的 consueme 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。

2、如果 abort 文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumeque,index 文件。


备注:本文是《RocketMQ 技术内幕》的前期素材,建议关注笔者的书籍:《RocketMQ 技术内幕》。

作者:唯有坚持不懈 | 出处:http://39sd.cn/9E281

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 博客站点 » 十:RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-下篇

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏