ES GET 流程

1. 问题描述

上一篇文章中,我们讲到了 ES Nested 是拆开保存的(1拖N 方式),通过 Root Document 的 _id 来做关联的。那这个 ES 直接通过 _id 来读取数据,又是怎么样的呢?有没有做合并、组装的呢?

2. GET API 例子

GET nike/_doc/0
{
    "_index" : "ike",
    "_type" : "_doc",
    "_id" : "0",
    "_version" : 1,
    "found": true,
    "_source" : {
        "user" : "kimchy",
        "date" : "2009-11-15T14:12:12",
        "likes": 0,
        "message" : "trying out Elasticsearch"
    }
}

接收请求的入口是:
org.elasticsearch.rest.action.document.RestGetAction

3. 可选参数

ES GET 流程


4. GET 基本流程

GET 操作 具体流程如下图所示:


ES GET 流程

  1. 备注:
    a. 上面的部署架构也是非常常见的。 P0、P1、P2 都为主分片;R0、R1、R2 为副本分片。
  2. 大致的请求流程描述如下:
    a. 客户端向 NODE1 发起 GET 请求。此时 NODE1 称之为 协调节点。如果协调节点压力大,也是性能损耗点,建议先把节点数量增加,再增大内存、CPU 。
    b. NODE1 根据规则,计算出对应的 docId 为 1 的文档再哪个 shard 上面。比如,docId 为 1 的文档在 Node2 上面。然后,发起 TCP 请求,将请求转发给 NODE2 。
    c. NODE2 根据规则,获取到具体的文档后。将请求再次转发给 NODE1。如果节点很多,导致网络请求链路过多,异常因素增加。也是 shard 分片数量需要考虑的点。
    d. NODE1 获取 NODE2 的响应后,将请求返回给 客户端。

ES 中,写入默认是写入大多数成功即可。默认是写入主分片成功,副本分片大多数成功即可。所以,在一瞬间去查询的话,有可能请求的是 副本分片,客户端会收到 doc 不存在的问题。

5、请求流程图

ES GET 流程


6、代码流程

6.1 协调节点

6.1.0 接收 HTTP 请求入口

// org.elasticsearch.rest.action.document.RestGetAction 代码入口 只贴出了部分代码。
                /**
         * {@link  TransportGetAction 跳转此类 }
         */
        return channel -> client.get(getRequest, new RestToXContentListener(channel) {
            @Override
            protected RestStatus getStatus(final GetResponse response) {
                return response.isExists() ? OK : NOT_FOUND;
            }
        });

6.1.1 总的代码入口

其实,请求应该是:org.elasticsearch.action.get.TransportGetAction。但是 org.elasticsearch.action.get.TransportGetAction 实现了 TransportSingleShardAction 这个抽象类。具体的请求处理先由 TransportSingleShardAction 来处理。具体处理如下:

-- org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute 代码入口
/**
     * 执行 execute,而且,是异步执行的
     * @param request
     * @param listener
     */
    @Override
    protected void doExecute(Task task, Request request, ActionListener listener) {
        // 异步操作,AsyncSingleAction 构造方法里面,先计算出正确的 索引,获取具体分片的路由规则。在执行 start()方法。
        new AsyncSingleAction(request, listener).start();
    }

6.1.2 计算 shard 路由表

代码入口:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#AsyncSingleAction。

......

// 获取正确的索引,比如,你请求的是 nike_xx ,通过正则表达式,计算出一些列规则,最后,这里只返回一个索引。
if (resolveIndex(request)) {
   concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
  concreteSingleIndex = request.index();
}
.......

// 根据hash和shard数量取余计算 shard 列表,或者走优先级规则
this.shardIt = shards(clusterState, internalRequest);

6.1.3 路由算法(`TransportGetAction` 在这里实现)

代码入口:org.elasticsearch.action.get.TransportGetAction#shards

/**
     * 根据 docId 计算,文档落在哪个分片中。
     * @param state
     * @param request
     * @return
     */
    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) {
        return clusterService.operationRouting()
                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
                    request.request().preference());
    }

---- 下面是计算 shardId。实现的比较有意思。如果有 docId,那么,就根据 docId 来获取。如果没有,就根据 routing 值来获取具体的shard 规则。
/**
     * 获取 shardId
     * @param indexMetadata
     * @param id
     * @param routing
     * @return
     */
    public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
        final String effectiveRouting;
        final int partitionOffset;

        // 使用的方法:PUT /index/type/id?routing=user_id,如果没有 routing ,那么,默认是 doc_id 来作为 routing。非常巧妙,不需要维护 docId 与 shard 关系了
        if (routing == null) {
            assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
            effectiveRouting = id;
        } else {
            effectiveRouting = routing;
        }

        // 如果路由分区的大小 不是  1
        if (indexMetadata.isRoutingPartitionedIndex()) {
            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
        } else {
            // we would have still got 0 above but this check just saves us an unnecessary hash calculation
            partitionOffset = 0;
        }

        // 计算 shardId ,基于 hash 来计算
        return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
    }

6.1.4 转发请求

// 本节点作为协调节点,向目标 node 转发请求,或者目标是本地节点,直接通过函数调用读取数据.发送流程封装了对请求的发送,
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start

6.1.5 发送请求

org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#perform

-- 这里由重试的逻辑;判断是否是本地节点;根据 shard 获取某个 节点,发送请求到数据节点请求。

// 获取下一个路由
final ShardRouting shardRouting = shardIt.nextOrNull();

6.2 数据节点

6.2.0 总入口:

将请求转发到数据节点后,由 org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler 来接收请求,处理请求。

6.2.1 接收请求请求

/**
     * shard 分片的操作
     */
    private class ShardTransportHandler implements TransportRequestHandler {

        @Override
        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
            }

                        // 异步操作
            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
        }
    }

6.2.2 转换 docId

// 代码入口:org.elasticsearch.index.get.ShardGetService#innerGet
// docId 其实也是一个 Term,名字是 _id,为什么这里要做 uuid 的转换呢?因为很多id 都是不规整的。UUID 编码后,长度一致,做好了很好的性能优化,比如,可以采用哈夫曼树来优化、压缩。
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));

6.2.3 进入 GET 核心地方

进入 org.elasticsearch.index.engine.InternalEngine#get 这个方法,主要是 判断版本号是否冲突、seqNo 是否冲突。还有一个重点,是获取 Searcher。 虽然 acquireSearcher 这个方法是获取 Searcher 的,但是经过一遍代码查找后,然后各种条件限制,需要继续跳到之前 6.2.4

// acquireSearcher 这个方法是获取 Searcher 的。但是你
getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));

6.2.4 获取 Search 入口(注意下方的 wrapSearcher)

/**
     * 读取数据
     * @param get
     * @return
     */
    public Engine.GetResult get(Engine.Get get) {
        // 校验是否允许读取数据
        readAllowed();

        // ES Mapper
        DocumentMapper mapper = mapperService.documentMapper();
        if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {
            return GetResult.NOT_EXISTS;
        }

        // 读取数据,这个 this:wrapperSearcher 方法最终在:org.elasticsearch.index.engine.Engine.SearcherSupplier.acquireSearcher 这里调用。最终的 search 是:ElasticsearchDirectoryReader
        return getEngine().get(get, mapper, this::wrapSearcher);
    }

// 可能大家看了 wraSearcher 还是一脸懵逼,不知道 这个 searcher 怎么读取数据的。

6.2.5 Searcher 初始化各种跳转、回调

ES GET 流程


6.2.6 Searcher 初始化

代码入口:org.elasticsearch.index.engine.InternalEngine#createReaderManager

org.elasticsearch.index.engine.InternalEngine#createReaderManager

// 下面,我只摘出了重要的部分说明。下面的函数说明
final ElasticsearchDirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); 

// 大家一定要仔细看 DirectoryReader.open(indexWriter) 这个方法。很妖娆。

// 

6.2.6 Searcher 初始化2

public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException {

        return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId);
    }

    private static final class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {
        private final ShardId shardId;
        SubReaderWrapper(ShardId shardId) {
            this.shardId = shardId;
        }

        /**
         * ElasticsearchDirectoryReader 在这个类的构造函数,里面的 super 方法,执行了 wrap 回调。
         * @param reader
         * @return
         */
        @Override
        public LeafReader wrap(LeafReader reader) {
            // TODO 仔细看看这里的实现,最终是实现了 FilterLeafReader 这个类。最终还是回归到 segmentReader 这个类上,去读内容
            return new ElasticsearchLeafReader(reader, shardId);
        }
    }

6.3 取数数据

6.3.1 代码入口:

在这里,会根据 Searcher 来读取数据。最终这个 searcher 是 segmentReader 这个类。

org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields

6.3.2 读取数据

// 引用了 Lucene 的包,根据 docId 去找数据,
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);

6.3.4 数据转换

FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
                                DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);

                            StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
                            if (status == StoredFieldVisitor.Status.YES) {
                                if (indexableField.numericValue() != null) {
                                    fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
                                } else if (indexableField.binaryValue() != null) {
                                    fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
                                } else if (indexableField.stringValue() != null) {

                                    fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
                                }
                            } else if (status == StoredFieldVisitor.Status.STOP) {
                                break;
                            }

6.3.5 过滤

sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());

7 总结

  • ES GET API 里面,比较难懂的是 Searcher 究竟是怎么实例化的。
  • 如果读取失败了,还会继续读取。具体代码位置:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#onFailure

8 附录

很多小伙伴还是想直观看看 ES 是怎么读取数据的,那么下面有一个 demo ,可以 DEBUG 跑个单元测试看看。

            Directory dir =new SimpleFSDirectory(new File("F:/luc_dir").toPath());
            IndexWriterConfig iwc = new IndexWriterConfig(null);
            iwc.setMaxBufferedDocs(100);
            iwc.setMergePolicy(NoMergePolicy.INSTANCE);
            IndexWriter iw = new IndexWriter(dir, iwc);

            // add two docs, id:0 and id:1
            Document doc = new Document();
            Field idField = new StringField("id", "", Field.Store.NO);
            doc.add(idField);
            idField.setStringValue("0");
            iw.addDocument(doc);
            idField.setStringValue("1");
            iw.addDocument(doc);

            // open reader
            ShardId shardId = new ShardId("fake", "_na_", 1);
            DirectoryReader ir = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw), shardId);
            ir.document(1);
            System.out.println("aa");
流程   ES   GET
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章