上一篇文章中,我们讲到了 ES Nested 是拆开保存的(1拖N 方式),通过 Root Document 的 _id 来做关联的。那这个 ES 直接通过 _id 来读取数据,又是怎么样的呢?有没有做合并、组装的呢?
GET nike/_doc/0
{
"_index" : "ike",
"_type" : "_doc",
"_id" : "0",
"_version" : 1,
"found": true,
"_source" : {
"user" : "kimchy",
"date" : "2009-11-15T14:12:12",
"likes": 0,
"message" : "trying out Elasticsearch"
}
}
接收请求的入口是:
org.elasticsearch.rest.action.document.RestGetAction
GET 操作 具体流程如下图所示:
ES 中,写入默认是写入大多数成功即可。默认是写入主分片成功,副本分片大多数成功即可。所以,在一瞬间去查询的话,有可能请求的是 副本分片,客户端会收到 doc 不存在的问题。
// org.elasticsearch.rest.action.document.RestGetAction 代码入口 只贴出了部分代码。
/**
* {@link TransportGetAction 跳转此类 }
*/
return channel -> client.get(getRequest, new RestToXContentListener(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
其实,请求应该是:org.elasticsearch.action.get.TransportGetAction。但是 org.elasticsearch.action.get.TransportGetAction 实现了 TransportSingleShardAction 这个抽象类。具体的请求处理先由 TransportSingleShardAction 来处理。具体处理如下:
-- org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute 代码入口
/**
* 执行 execute,而且,是异步执行的
* @param request
* @param listener
*/
@Override
protected void doExecute(Task task, Request request, ActionListener listener) {
// 异步操作,AsyncSingleAction 构造方法里面,先计算出正确的 索引,获取具体分片的路由规则。在执行 start()方法。
new AsyncSingleAction(request, listener).start();
}
代码入口:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#AsyncSingleAction。
......
// 获取正确的索引,比如,你请求的是 nike_xx ,通过正则表达式,计算出一些列规则,最后,这里只返回一个索引。
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
.......
// 根据hash和shard数量取余计算 shard 列表,或者走优先级规则
this.shardIt = shards(clusterState, internalRequest);
代码入口:org.elasticsearch.action.get.TransportGetAction#shards
/**
* 根据 docId 计算,文档落在哪个分片中。
* @param state
* @param request
* @return
*/
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
request.request().preference());
}
---- 下面是计算 shardId。实现的比较有意思。如果有 docId,那么,就根据 docId 来获取。如果没有,就根据 routing 值来获取具体的shard 规则。
/**
* 获取 shardId
* @param indexMetadata
* @param id
* @param routing
* @return
*/
public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
// 使用的方法:PUT /index/type/id?routing=user_id,如果没有 routing ,那么,默认是 doc_id 来作为 routing。非常巧妙,不需要维护 docId 与 shard 关系了
if (routing == null) {
assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
// 如果路由分区的大小 不是 1
if (indexMetadata.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
// 计算 shardId ,基于 hash 来计算
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
}
// 本节点作为协调节点,向目标 node 转发请求,或者目标是本地节点,直接通过函数调用读取数据.发送流程封装了对请求的发送,
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#perform
-- 这里由重试的逻辑;判断是否是本地节点;根据 shard 获取某个 节点,发送请求到数据节点请求。
// 获取下一个路由
final ShardRouting shardRouting = shardIt.nextOrNull();
将请求转发到数据节点后,由 org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler 来接收请求,处理请求。
/**
* shard 分片的操作
*/
private class ShardTransportHandler implements TransportRequestHandler {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
// 异步操作
asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
}
}
// 代码入口:org.elasticsearch.index.get.ShardGetService#innerGet
// docId 其实也是一个 Term,名字是 _id,为什么这里要做 uuid 的转换呢?因为很多id 都是不规整的。UUID 编码后,长度一致,做好了很好的性能优化,比如,可以采用哈夫曼树来优化、压缩。
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
进入 org.elasticsearch.index.engine.InternalEngine#get 这个方法,主要是 判断版本号是否冲突、seqNo 是否冲突。还有一个重点,是获取 Searcher。 虽然 acquireSearcher 这个方法是获取 Searcher 的,但是经过一遍代码查找后,然后各种条件限制,需要继续跳到之前 6.2.4
// acquireSearcher 这个方法是获取 Searcher 的。但是你
getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
/**
* 读取数据
* @param get
* @return
*/
public Engine.GetResult get(Engine.Get get) {
// 校验是否允许读取数据
readAllowed();
// ES Mapper
DocumentMapper mapper = mapperService.documentMapper();
if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {
return GetResult.NOT_EXISTS;
}
// 读取数据,这个 this:wrapperSearcher 方法最终在:org.elasticsearch.index.engine.Engine.SearcherSupplier.acquireSearcher 这里调用。最终的 search 是:ElasticsearchDirectoryReader
return getEngine().get(get, mapper, this::wrapSearcher);
}
// 可能大家看了 wraSearcher 还是一脸懵逼,不知道 这个 searcher 怎么读取数据的。
代码入口:org.elasticsearch.index.engine.InternalEngine#createReaderManager
org.elasticsearch.index.engine.InternalEngine#createReaderManager
// 下面,我只摘出了重要的部分说明。下面的函数说明
final ElasticsearchDirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
// 大家一定要仔细看 DirectoryReader.open(indexWriter) 这个方法。很妖娆。
//
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException {
return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId);
}
private static final class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {
private final ShardId shardId;
SubReaderWrapper(ShardId shardId) {
this.shardId = shardId;
}
/**
* ElasticsearchDirectoryReader 在这个类的构造函数,里面的 super 方法,执行了 wrap 回调。
* @param reader
* @return
*/
@Override
public LeafReader wrap(LeafReader reader) {
// TODO 仔细看看这里的实现,最终是实现了 FilterLeafReader 这个类。最终还是回归到 segmentReader 这个类上,去读内容
return new ElasticsearchLeafReader(reader, shardId);
}
}
在这里,会根据 Searcher 来读取数据。最终这个 searcher 是 segmentReader 这个类。
org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields
// 引用了 Lucene 的包,根据 docId 去找数据,
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
if (status == StoredFieldVisitor.Status.YES) {
if (indexableField.numericValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
} else if (indexableField.binaryValue() != null) {
fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
} else if (indexableField.stringValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
}
} else if (status == StoredFieldVisitor.Status.STOP) {
break;
}
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
很多小伙伴还是想直观看看 ES 是怎么读取数据的,那么下面有一个 demo ,可以 DEBUG 跑个单元测试看看。
Directory dir =new SimpleFSDirectory(new File("F:/luc_dir").toPath());
IndexWriterConfig iwc = new IndexWriterConfig(null);
iwc.setMaxBufferedDocs(100);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
IndexWriter iw = new IndexWriter(dir, iwc);
// add two docs, id:0 and id:1
Document doc = new Document();
Field idField = new StringField("id", "", Field.Store.NO);
doc.add(idField);
idField.setStringValue("0");
iw.addDocument(doc);
idField.setStringValue("1");
iw.addDocument(doc);
// open reader
ShardId shardId = new ShardId("fake", "_na_", 1);
DirectoryReader ir = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw), shardId);
ir.document(1);
System.out.println("aa");
留言与评论(共有 0 条评论) “” |