ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【Elasticsearch源码】CCR源码分析(二)

2020-01-21 20:03:58  阅读:319  来源: 互联网

标签:final Translog long CCR 源码 Elasticsearch location Operation response


接上一篇:【Elasticsearch源码】CCR源码分析(一)

sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#shardOperation,跟据上面的read request,从Translog中获取该shard的seq_no范围内的所有Operation,返回最新的shard需要的Operation。

        protected Response shardOperation(Request request, ShardId shardId) throws IOException {
            .......
            // 获取Operation
            final Translog.Operation[] operations = getOperations(
                    indexShard,
                    seqNoStats.getGlobalCheckpoint(),
                    request.getFromSeqNo(),
                    request.getMaxOperationCount(),
                    request.getExpectedHistoryUUID(),
                    request.getMaxBatchSize());
            // 在快照操作完成之后,确保maxSeqNoOfUpdatesOrDeletes,索引元数据,mapping和setting是最新的
            final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
            final IndexMetaData indexMetaData = indexService.getMetaData();
            final long mappingVersion = indexMetaData.getMappingVersion();
            final long settingsVersion = indexMetaData.getSettingsVersion();
            return getResponse(......);
        }

获取Operation的操作如下:先进行参数检验,然后创建Translog快照,遍历快照里面的Operation并添加直至超过最大的批次限制。

    static Translog.Operation[] getOperations(....) throws IOException {
        .....// 参数检验
        int seenBytes = 0;
        long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1);
        final List<Translog.Operation> operations = new ArrayList<>();
        // 创建Translog快照,根据translog快照读取Operation
        try (Translog.Snapshot snapshot = indexShard.newChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
            Translog.Operation op;
            while ((op = snapshot.next()) != null) {
                operations.add(op);
                seenBytes += op.estimateSize();
                if (seenBytes > maxBatchSize.getBytes()) {
                    break;
                }
            }
        } catch (MissingHistoryOperationsException e) {
            ......
        }
        return operations.toArray(EMPTY_OPERATIONS_ARRAY);
    }

sendShardChangesRequest方法中通过handleReadResponse方法监听处理Response。

void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
    	// 处理read response
        Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);
        // 更新follow index mapping
        Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(),handleResponseTask);
        // 更新follow index settings
        maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
    }

调用innerHandleReadResponse方法对read response进行处理,如果response没有Operation,会重新发送sendShardChangesRequest请求,否则将response里面的所有Operation添加到buffer里面,然后进入write流程。

    synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
        .......
        if (response.getOperations().length == 0) { 
            newFromSeqNo = from;
        } else {
            List<Translog.Operation> operations = Arrays.asList(response.getOperations());
            long operationsSize = operations.stream().mapToLong(Translog.Operation::estimateSize).sum();
            buffer.addAll(operations);
            bufferSizeInBytes += operationsSize;
            final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
            newFromSeqNo = maxSeqNo + 1;
            lastRequestedSeqNo = Math.max(lastRequestedSeqNo, maxSeqNo);
            coordinateWrites();//进入write
        }
        if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
            int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
            sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo); //重新发送请求
        } else {
            numOutstandingReads--;
            coordinateReads(); //重新进入read
        }
    }

write流程同样会先判断write容量是否满了,然后从buffer队列里面遍历所有的Operation添加到ops的ArrayList里面,并通过sendBulkShardOperationsRequest发生请求。

    private synchronized void coordinateWrites() {
        ......
        while (hasWriteBudget() && buffer.isEmpty() == false) {
            long sumEstimatedSize = 0L;
            int length = Math.min(params.getMaxWriteRequestOperationCount(), buffer.size());
            List<Translog.Operation> ops = new ArrayList<>(length);
            for (int i = 0; i < length; i++) {
                Translog.Operation op = buffer.remove();
                ops.add(op);
                sumEstimatedSize += op.estimateSize();
                if (sumEstimatedSize > params.getMaxWriteRequestSize().getBytes()) {
                    break;
                }
            }
            bufferSizeInBytes -= sumEstimatedSize;
            numOutstandingWrites++;
            // 发送bulk写请求
            sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0));
        }
    }

进入到TransportBulkShardOperationsAction类里面,开始写入主分片和副本分片,TransportBulkShardOperationsAction继承于TransportWriteAction类。
在这里插入图片描述
其写入流程和正常的写入bulk一致,只不过重写了shardOperationOnPrimary和shardOperationOnReplica方法。通过重放translog文件进行primary的写入,写入成功之后更新同步translog location并构建replicaRequest。副本写入过程,根据上面构建好的replicaRequest直接写入。

    public static CcrWritePrimaryResult shardOperationOnPrimary(....) throws IOException {
        ......
        final List<Translog.Operation> appliedOperations = new ArrayList<>(sourceOperations.size());
        Translog.Location location = null;
        for (Translog.Operation sourceOp : sourceOperations) {
            final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm()); //包含操作类型,和相关的信息以及source
            final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY);  // 通过重放translog文件,最终进入到了写primary的逻辑
            if (result.getResultType() == Engine.Result.Type.SUCCESS) {
                appliedOperations.add(targetOp);
                location = locationToSync(location, result.getTranslogLocation());  // 写入成功的话更新同步translog location
            } else {
                ......
            }
        }
        // 写入主分片成功之后,构建replicaRequest
        final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
            shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
        return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);  //更新Checkpoint,SeqNo
    }

    public static WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(.....) throws IOException {
        Translog.Location location = null;
        for (final Translog.Operation operation : request.getOperations()) {
            final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);  //进入到写数据流程
            if (result.getResultType() != Engine.Result.Type.SUCCESS) {
              .....
            }
            location = locationToSync(location, result.getTranslogLocation());// 写入成功的话更新同步translog location
        }
        return new WriteReplicaResult<>(request, location, null, replica, logger);
    }

handleWriteResponse方法监听并处理sendBulkShardOperationsRequest的结果,每次处理成功numOutstandingWrites减1,直到numOutstandingWrites等于0,如果缓冲区有预量,则继续进行read。

private synchronized void handleWriteResponse(final BulkShardOperationsResponse response) {
        this.followerGlobalCheckpoint = Math.max(this.followerGlobalCheckpoint, response.getGlobalCheckpoint());
        this.followerMaxSeqNo = Math.max(this.followerMaxSeqNo, response.getMaxSeqNo());
        numOutstandingWrites--;
        assert numOutstandingWrites >= 0;
        coordinateWrites();
        // 缓冲区有预量时开始读取
        coordinateReads();
    }

总的来说,follower shard发送read request,在seq_no范围之内:如果leader shard有可用的新Operation,则按配置的参数来限制响应,然后写入数据;如果leader shard没有可用的新Operation,则在超时时间内等待;如果超时时间内发生了新的Operation,则立即对新的Operation进行响应,否则,如果超时,将会回复follower shard没有新的Operation。

read和write的过程中通过一个buffer缓存区来进行缓存:buffer是一个按seqNo进行排序的优先队列。

private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));

4 小结

  1. CCR以插件的方式加载和使用,不侵入式修改内核;
  2. 采用了快照恢复的方式进行全量的复制;
  3. 增量复制的过程采用了从远程集群Translog事务日志里面获取所有的Operation并将数据写入本地集群;
  4. 复制是在shard级别的,所以每个shard有自身的Follower Shard Task;
  5. 集群间数据的一致性通过seq_no和GlobalCheckpoint来校验;
  6. ES的段文件在merge过程中可能会删除或更新部分doc的关联操作,会导致seq_no的变化,所有使用了soft_deletes,默认保留12小时。
少加点香菜 发布了31 篇原创文章 · 获赞 65 · 访问量 5万+ 私信 关注

标签:final,Translog,long,CCR,源码,Elasticsearch,location,Operation,response
来源: https://blog.csdn.net/wudingmei1023/article/details/104064469

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有