标签:and cal nes 简单 fir build 分页查询 error 端口
ElasticsearchConfig
///
/// ES 连接配置
///
public class ElasticsearchConfig
{
///
/// 节点列表
///
public IEnumerable Nodes { get; set; }
///
/// 连接池类型
///
public ElasticsearchConnectionPoolType PoolType { get; set; } = ElasticsearchConnectionPoolType.Static;
///
/// 用户名
///
public string UserName { get; set; }
///
/// 密码
///
public string Password { get; set; }
///
/// 显示调试信息
///
public bool DisableDebugInfo { get; set; } = true;
///
/// 抛出异常。默认false,错误信息在每个操作的response中
///
public bool ThrowExceptions { get; set; } = false;
///
/// 是否禁用Ping。禁用ping 第一次使用节点或使用被标记死亡的节点进行ping
///
public bool DisablePing { get; set; } = true;
}
ElasticsearchConfigProvider
///
/// Elasticsearch 配置提供程序
///
public class ElasticsearchConfigProvider : IElasticsearchConfigProvider
{
///
/// 配置
///
private readonly ElasticsearchConfig _config;
///
/// 初始化一个类型的实例
///
/// Elasticsearch 连接配置
public ElasticsearchConfigProvider(ElasticsearchConfig config)
{
_config = config;
}
///
/// 获取配置
///
///
public Task GetConfigAsync()
{
return Task.FromResult(_config);
}
}
ElasticsearchConnectionPoolType
///
/// ES 连接池类型。
/// 支持ping-说明能够发现节点的状态;
/// 支持嗅探-说明能够发现新的节点
///
public enum ElasticsearchConnectionPoolType
{
///
/// 静态连接池。推荐使用,应用于已知集群,请求时随机请求各个正常节点,支持ping,不支持嗅探
///
Static,
///
/// 单节点连接池
///
SingleNode,
///
/// 嗅探连接池。可动态嗅探集群,随机请求,支持嗅探、ping
///
Sniffing,
///
/// 固定连接池。选择一个可用节点作为请求主节点,支持ping,不支持嗅探
///
Sticky,
///
/// 固定嗅探连接池。选择一个可用节点作为请求主节点,支持ping,支持嗅探
///
StickySniffing
}
ElasticsearchNode
///
/// Elasticsearch 节点
///
public class ElasticsearchNode
{
///
/// 主机
///
public string Host { get; set; }
///
/// 端口号
///
public uint Port { get; set; }
///
/// 输出字符串
///
///
public override string ToString()
{
var port = Port == 0 ? "" : $":{Port}";
var result = $"{Host}{port}".ToLowerInvariant();
return result.IndexOf("http", StringComparison.OrdinalIgnoreCase) > -1 ? result : $"http://{result}";
}
}
IElasticsearchConfigProvider
public interface IElasticsearchConfigProvider
{
///
/// 获取配置
///
///
Task GetConfigAsync();
}
ElasticClientExtensions
///
/// ES客户端() 扩展
///
internal static class ElasticClientExtensions
{
///
/// 初始化索引映射
///
/// ES客户端
/// 索引名
public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName)
{
var newName = indexName + DateTime.Now.Ticks;
var result = await client.CreateIndexAsync(newName,
t => t.Index(newName).Settings(x => x.NumberOfShards(1).NumberOfReplicas(1).Setting("max_result_window", int.MaxValue)));
if (result.Acknowledged)
{
await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName)));
return;
}
throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}");
}
///
/// 初始化索引映射
///
/// 实体类型
/// ES客户端
/// 索引名
public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName) where T : class
{
var newName = indexName + DateTime.Now.Ticks;
var result = await client.CreateIndexAsync(newName,
t => t.Index(newName)
.Settings(o => o.NumberOfShards(1).NumberOfReplicas(1)
.Setting("max_result_window", int.MaxValue))
.Mappings(m => m.Map(mm => mm.AutoMap())));
if (result.Acknowledged)
{
await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName)));
return;
}
throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}");
}
///
/// 初始化索引映射
///
/// 实体类型
/// ES客户端
/// 索引名
public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName, int numberOfShards,
int numberOfReplicas) where T : class
{
var newName = indexName + DateTime.Now.Ticks;
var result = await client.CreateIndexAsync(newName,
x => x.Index(newName)
.Settings(o =>
o.NumberOfShards(numberOfShards)
.NumberOfReplicas(numberOfReplicas)
.Setting("max_result_window", int.MaxValue))
.Mappings(m => m.Map(mm => mm.AutoMap())));
if (result.Acknowledged)
{
await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName)));
return;
}
throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}");
}
}
HighlightParam
public class HighlightParam
{
///
/// 高亮字段
///
public string[] Keys { get; set; }
///
/// 高亮标签
///
public string PreTags { get; set; } = "";
///
/// 高亮标签
///
public string PostTags { get; set; } = "";
///
/// 高亮字段前缀。
/// 例如:title 高亮值赋值给 h_title
///
public string PrefixOfKey { get; set; } = string.Empty;
///
/// 是否替换原来的值
///
public bool ReplaceAuto { get; set; } = true;
}
IPageParam
public interface IPageParam
{
///
/// 页数,即第几页,从1开始
///
int Page { get; set; }
///
/// 每页显示行数
///
int PageSize { get; set; }
///
/// 关键词
///
string Keyword { get; set; }
///
/// 获取跳过的行数
///
///
int GetSkipCount();
///
/// 运算符
///
Nest.Operator Operator { get; set; }
///
/// 高亮参数
///
HighlightParam Highlight { get; set; }
}
///
/// 分页参数
///
public class PageParam : IPageParam
{
///
/// 页数,即第几页,从1开始
///
public int Page { get; set; }
///
/// 每页显示行数
///
public int PageSize { get; set; }
///
/// 关键词
///
public string Keyword { get; set; }
///
/// 获取跳过的行数
///
///
public int GetSkipCount() => (Page - 1) * PageSize;
///
/// 运算符
///
public Nest.Operator Operator { get; set; } = Nest.Operator.And;
///
/// 高亮参数
///
public HighlightParam Highlight { get; set; }
}
///
/// 指定字段查询
///
public class PageParamWithSearch : PageParam
{
///
/// 查询字段列表
///
public string[] SearchKeys { get; set; }
}
IQueryResult
///
/// 查询结果
///
/// 实体类型
public interface IQueryResult
{
///
/// 总行数
///
long TotalCount { get; set; }
///
/// 查询占用时间
///
long Took { get; set; }
///
/// 数据
///
IEnumerable Data { get; }
}
///
/// 自定义查询结果
///
/// 实体类型
public class CustomQueryResult : IQueryResult
{
///
/// 总行数
///
public long TotalCount { get; set; }
///
/// 查询占用时间
///
public long Took { get; set; }
///
/// 数据
///
public IEnumerable Data { get; set; }
}
ElasticsearchClient
///
/// ES客户端
///
public class ElasticsearchClient : IElasticsearchClient
{
///
/// ES客户端生成器
///
private ElasticsearchClientBuilder _builder;
///
/// 配置提供程序
///
private IElasticsearchConfigProvider _configProvider;
///
/// 初始化一个类型的实例
///
/// 配置提供程序
public ElasticsearchClient(IElasticsearchConfigProvider configProvider)
{
_configProvider = configProvider ?? throw new ArgumentNullException(nameof(configProvider));
_builder = new ElasticsearchClientBuilder(configProvider);
}
///
/// 是否存在指定索引
///
/// 索引名
///
public async Task ExistsAsync(string indexName)
{
var client = await _builder.GetClientAsync();
var result = await client.IndexExistsAsync(indexName);
return result.Exists;
}
///
/// 添加索引。不映射
///
/// 索引名
public async Task AddAsync(string indexName)
{
var client = await _builder.GetClientAsync();
if (await ExistsAsync(indexName)) return;
await client.InitializeIndexMapAsync(indexName);
}
///
/// 添加索引。自动映射实体属性
///
/// 实体类型
/// 索引名
public async Task AddAsync(string indexName) where T : class
{
var client = await _builder.GetClientAsync();
if (await ExistsAsync(indexName)) return;
await client.InitializeIndexMapAsync(indexName);
}
///
/// 添加索引。自动映射实体属性并赋值
///
/// 实体类型
/// 索引名
/// 实体
public async Task AddAsync(string indexName, T entity) where T : class
{
var client = await _builder.GetClientAsync();
if (!await ExistsAsync(indexName))
await client.InitializeIndexMapAsync(indexName);
var response = await client.IndexAsync(entity, x => x.Index(indexName));
if(!response.IsValid)
throw new ElasticsearchException($"新增数据[{indexName}]失败 : {response.ServerError.Error.Reason}");
}
///
/// 更新索引。
/// 由于是普通的简单更新,当ID已经存在时,则会更新文档,所以这里直接调用index方法(复杂方法待研究)
///
/// 实体类型
/// 索引名
/// 实体
public async Task UpdateAsync(string indexName, T entity) where T : class
=>await AddAsync(indexName, entity);
///
/// 删除索引
///
/// 索引名
public async Task DeleteAsync(string indexName)
{
var client = await _builder.GetClientAsync();
var response = await client.DeleteIndexAsync(indexName);
if (response.Acknowledged) return;
}
///
/// 删除索引
///
/// 实体类型
/// 索引名
/// 实体
public async Task DeleteAsync(string indexName, T entity) where T : class
{
var client = await _builder.GetClientAsync();
var response = await client.DeleteAsync(new DeleteRequest(indexName, typeof(T), new Id(entity)));
if (response.ServerError == null) return;
throw new ElasticsearchException($"删除索引[{indexName}]失败 : {response.ServerError.Error.Reason}");
}
///
/// 删除索引
///
/// 实体类型
/// 索引名
/// 主键ID
public async Task DeleteAsync(string indexName, long id) where T : class
{
var client = await _builder.GetClientAsync();
var response = await client.DeleteAsync(DocumentPath.Id(new Id(id)), x => x.Type().Index(indexName));
if (response.ServerError == null) return;
throw new ElasticsearchException($"删除索引[{indexName}]失败 : {response.ServerError.Error.Reason}");
}
///
/// 查询实体
///
/// 实体类型
/// 索引名
/// 主键ID
///
public async Task FindAsync(string indexName, long id) where T : class
{
var client = await _builder.GetClientAsync();
var response = await client.GetAsync(id, x => x.Type().Index(indexName));
return response?.Source;
}
///
/// 查询。单一条件查询,一般是精确查询
///
/// 实体类型
/// 索引名
/// 字段名
/// 查询值
///
public async Task> QueryAsync(string indexName, string field, object value) where T : class
{
if (string.IsNullOrWhiteSpace(field))
return null;
var client = await _builder.GetClientAsync();
var searchRequest = new SearchDescriptor()
.Index(indexName)
.PostFilter(t => t.Term(x => x.Field(field).Value(value)));
var response = await client.SearchAsync(searchRequest);
return response.Documents;
}
///
/// 查找实体列表
///
/// 实体类型
/// 索引名
/// 主键值
///
public async Task> FindByIdsAsync(string indexName, params long[] ids) where T : class
{
var client = await _builder.GetClientAsync();
var searchRequest = new SearchDescriptor().Index(indexName).Query(t => t.Ids(x => x.Values(ids)));
var response = await client.SearchAsync(searchRequest);
return response.Documents;
}
///
/// 查找实体列表
///
/// 实体类型
/// 索引名
/// 主键值
///
public async Task> FindByIdsAsync(string indexName, params string[] ids) where T : class
{
var client = await _builder.GetClientAsync();
var searchRequest = new SearchDescriptor().Index(indexName).Query(t => t.Ids(x => x.Values(ids)));
var response = await client.SearchAsync(searchRequest);
return response.Documents;
}
///
/// 查找实体列表
///
/// 实体类型
/// 索引名
/// 主键值
///
public async Task> FindByIdsAsync(string indexName, params Guid[] ids) where T : class
{
var client = await _builder.GetClientAsync();
var searchRequest = new SearchDescriptor().Index(indexName).Query(q => q.Ids(x => x.Values(ids)));
var response = await client.SearchAsync(searchRequest);
return response.Documents;
}
///
/// 分页查询
///
/// 实体类型
/// 分页参数
/// 索引名
///
public async Task> PageQueryAsync(IPageParam param, string indexName) where T : class
{
if (param == null)
{
param = new PageParam()
{
Page = 1,
PageSize = 20
};
}
var searchRequest = new SearchDescriptor()
.Type()
.Index(indexName)
.From(param.GetSkipCount())
.Size(param.PageSize);
if (param is PageParamWithSearch pageSearch)
ConfigPageRequest(pageSearch, ref searchRequest);
else if(param is PageParam pageParam)
ConfigPageRequest(pageParam, ref searchRequest);
// 是否需要高亮
bool hasHighlight = param.Highlight?.Keys?.Length > 0;
if(hasHighlight)
BuildHighLightQuery(param, ref searchRequest);
var client = await _builder.GetClientAsync();
var response = await client.SearchAsync(x => searchRequest);
//if (hasHighlight)
//{
// var listWithHightlight = new List();
// response.Hits.ToList().ForEach(x =>
// {
// if (x.Highlights?.Count > 0)
// {
// PropertyInfo[] properties = typeof(T).GetProperties();
// foreach (string key in pageParams.Highlight?.Keys)
// {
// //先得到要替换的内容
// if (x.Highlights.ContainsKey(key))
// {
// string value = string.Join("", x.Highlights[key]?.Highlights);
// PropertyInfo info = properties.FirstOrDefault(p => p.Name == pageParams.Highlight.PrefixOfKey + key);
// //没找到带前缀的属性,则替换之前的
// if (info == null && pageParams.Highlight.ReplaceAuto)
// {
// info = properties.FirstOrDefault(p => p.Name == key);
// }
// if (info?.CanWrite == true)
// {
// if (!string.IsNullOrEmpty(value))
// {
// //如果高亮字段不为空,才赋值,否则就赋值成空
// info.SetValue(x.Source, value);
// }
// }
// }
// }
// }
// listWithHightlight.Add(x.Source);
// });
//}
return new CustomQueryResult()
{
Data = response.Documents,
Took = response.Took,
TotalCount = response.Total
};
}
///
/// 配置指定字段的分页请求
///
private void ConfigPageRequest(PageParamWithSearch param, ref SearchDescriptor searchRequest) where T : class
{
searchRequest = searchRequest.Query(t=>
t.QueryString(x =>
x.Fields(param.SearchKeys)
.Query(param.Keyword)
.DefaultOperator(param.Operator)));
}
///
/// 配置分页请求
///
private void ConfigPageRequest(PageParam param, ref SearchDescriptor searchRequest) where T : class
{
searchRequest= searchRequest.Query(
t=>t.QueryString(q=>q.Query(param.Keyword)
.DefaultOperator(param.Operator)));
}
///
/// 构造高亮查询
///
private void BuildHighLightQuery(IPageParam param, ref SearchDescriptor searchRequest) where T : class
{
var keysLength = param.Highlight?.Keys?.Length ?? 0;
var fieldDescriptor = new Func, IHighlightField>[keysLength];
var keysIndex = 0;
foreach (var key in param.Highlight?.Keys)
{
fieldDescriptor[keysIndex] = hf => hf.Field(key)
.HighlightQuery(q => q.Match(m => m.Field(key).Query(param.Keyword)));
keysIndex++;
}
IHighlight highlight = new HighlightDescriptor()
.PreTags(param.Highlight.PreTags)
.PostTags(param.Highlight.PostTags)
.Fields(fieldDescriptor);
searchRequest = searchRequest.Highlight(s => highlight);
}
///
/// 批量保存
///
/// 实体类型
/// 索引名
/// 实体列表
public async Task BulkSaveAsync(string indexName, IEnumerable entities) where T : class
{
var client = await _builder.GetClientAsync();
if (!await ExistsAsync(indexName))
{
await client.InitializeIndexMapAsync(indexName);
}
var bulk = new BulkRequest(indexName)
{
Operations = new List()
};
foreach (var entity in entities)
{
bulk.Operations.Add(new BulkIndexOperation(entity));
}
var response = await client.BulkAsync(bulk);
if (response.Errors)
{
throw new ElasticsearchException($"批量保存文档在索引 {indexName} 失败:{response.ServerError.Error.Reason}");
}
}
}
ElasticsearchClientBuilder
///
/// ES客户端生成器
///
internal class ElasticsearchClientBuilder
{
///
/// ES客户端
///
private IElasticClient _client;
///
/// 配置提供程序
///
private readonly IElasticsearchConfigProvider _configProvider;
///
/// 对象锁
///
private static object _lock = new object();
///
/// 初始化一个类型的实例
///
/// 配置提供程序
public ElasticsearchClientBuilder(IElasticsearchConfigProvider configProvider)
{
_configProvider = configProvider;
}
///
/// 获取ES客户端
///
///
public async Task GetClientAsync()
{
if (_client == null)
{
var config = await _configProvider.GetConfigAsync();
lock (_lock)
{
if (_client == null)
{
if (config.Nodes == null) throw new ArgumentException("请设置ES客户端节点");
_client = CreateClient(config);
}
}
}
return _client;
}
///
/// 创建ES客户端
///
/// 配置
///
private IElasticClient CreateClient(ElasticsearchConfig config)
{
var connectionPool = CreateConnectionPool(config);
var settings = new ConnectionSettings(connectionPool);
ConfigSettings(settings, config);
return new ElasticClient(settings);
}
///
/// 创建连接池
///
///
///
private IConnectionPool CreateConnectionPool(ElasticsearchConfig config)
{
var nodes = config.Nodes.Select(t => new Uri(t.ToString())).ToList();
switch (config.PoolType)
{
case ElasticsearchConnectionPoolType.Static:
return new StaticConnectionPool(nodes);
case ElasticsearchConnectionPoolType.SingleNode:
return new SingleNodeConnectionPool(nodes.FirstOrDefault());
case ElasticsearchConnectionPoolType.Sniffing:
return new SniffingConnectionPool(nodes);
case ElasticsearchConnectionPoolType.Sticky:
return new StickyConnectionPool(nodes);
case ElasticsearchConnectionPoolType.StickySniffing:
return new StickySniffingConnectionPool(nodes, x => 1.0F);
default:
return new StaticConnectionPool(nodes);
}
}
///
/// 配置连接设置
///
/// 连接设置
/// 配置
private void ConfigSettings(ConnectionSettings settings, ElasticsearchConfig config)
{
// 启用验证
if (!string.IsNullOrWhiteSpace(config.UserName) && !string.IsNullOrWhiteSpace(config.Password))
settings.BasicAuthentication(config.UserName, config.Password);
// 验证证书
//settings.ClientCertificate("");
//settings.ClientCertificates(new System.Security.Cryptography.X509Certificates.X509CertificateCollection());
//settings.ServerCertificateValidationCallback();
// 开启第一次使用时进行嗅探,需连接池支持
//settings.SniffOnStartup(false);
// 链接最大并发数
//settings.ConnectionLimit(80);
// 标记为死亡节点的超时时间
//settings.DeadTimeout(new TimeSpan(10000));
//settings.MaxDeadTimeout(new TimeSpan(10000));
// 最大重试次数
//settings.MaximumRetries(5);
// 重试超时时间,默认是RequestTimeout
//settings.MaxRetryTimeout(new TimeSpan(50000));
// 禁用代理自动检测
//settings.DisableAutomaticProxyDetection(true);
// 禁用ping,第一次使用节点或使用被标记死亡的节点进行ping
settings.DisablePing(config.DisablePing);
// ping超时设置
//settings.PingTimeout(new TimeSpan(10000));
// 选择节点
//settings.NodePredicate(node => { return true; });
// 默认操作索引
//settings.DefaultIndex("");
// 字段名规则 与model字段同名
//settings.DefaultFieldNameInferrer(name => name);
// 根据Type获取类型名
//settings.DefaultTypeNameInferrer(name => name.Name);
// 请求超时设置
//settings.RequestTimeout(new TimeSpan(10000));
// 调试信息
settings.DisableDirectStreaming(config.DisableDebugInfo);
//settings.EnableDebugMode((apiCallDetails) =>
//{
// // 请求完成 返回 apiCallDetails
//});
// 抛出异常,默认false,错误信息在每个操作的response中
settings.ThrowExceptions(config.ThrowExceptions);
//settings.OnRequestCompleted(apiCallDetails =>
//{
// // 请求完成 返回apiCallDetails
//});
//settings.OnRequestDataCreated(requestData =>
//{
// // 请求的数据创建完成 返回请求的数据
//});
}
}
ElasticsearchException
///
/// Elasticsearch 异常
///
[Serializable]
public class ElasticsearchException : Exception
{
///
/// 初始化一个类型的实例
///
public ElasticsearchException() { }
///
/// 初始化一个类型的实例
///
/// 序列号信息
/// 流上下文
public ElasticsearchException(SerializationInfo serializationInfo, StreamingContext context) : base(serializationInfo, context)
{
}
///
/// 初始化一个类型的实例
///
/// 错误消息
public ElasticsearchException(string message) : base(message) { }
///
/// 初始化一个类型的实例
///
/// 错误消息
/// 内部异常
public ElasticsearchException(string message, Exception innerException) : base(message, innerException) { }
}
IElasticsearchClient
///
/// ES客户端
///
public interface IElasticsearchClient
{
///
/// 是否存在指定索引
///
/// 索引名
///
Task ExistsAsync(string indexName);
///
/// 添加索引。不映射
///
/// 索引名
Task AddAsync(string indexName);
///
/// 添加索引。自动映射实体属性
///
/// 实体类型
/// 索引名
Task AddAsync(string indexName) where T : class;
///
/// 添加索引。自动映射实体属性并赋值
///
/// 实体类型
/// 索引名
/// 实体
///
Task AddAsync(string indexName, T entity) where T : class;
///
/// 更新索引。
/// 由于是普通的简单更新,当ID已经存在时,则会更新文档,所以这里直接调用index方法(复杂方法待研究)
///
/// 实体类型
/// 索引名
/// 实体
Task UpdateAsync(string indexName, T entity) where T : class;
}
C# Elasticsearch帮助类
标签:and cal nes 简单 fir build 分页查询 error 端口
原文地址:https://www.cnblogs.com/sunliyuan/p/14473886.html