ES Client性能测试初探

网站建设3年前发布 jaypp
30 0 0
文章目录

最近在工作中协助研发进行了ES优化,效果还是非常明显的,几乎翻倍。除了通过各种业务接口测试ES性能以外,还可以直接请求ES接口,绕过服务,这样应该数据回更加准确。所以,ES Client学起来。,首先,先准备了一个ES服务,这里就不多赘述了,大家自己在尝试的时候一定主意好ES Server和ES Client的版本要一致。 其次,新建项目,添加依赖。,搜一下,能搜到很多的ES学习资料,建议先去看看大厂出品的基础知识了解一下ES功能。然后就可以直接看ES的API了。 下面是ES官方的文档地址: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/java-rest-high-search.html,如果能能查看自己公司项目源码的小伙伴可以多研究研发的代码,能够更好结合业务理解ES API的使用。,这里说一下,很多ES查询功能都是通过HTTP请求完成的,GET请求,body传参,一开始还是比较懵逼的。查了一些资料需要自己实现是个body携带数据的HTTPGET请求,下面是我的实现代码:,如果使用HTTP接口进行ES操作,需要组合多层级的参数,这个写起来会比较麻烦、可读性也比较差,而且更加容易出错。所以,还是使用ES Client作为操作ES的基础框架。,如果翻看ES Client源码,最终也是通过HttpClient发起HTTP请求的,这中间进行了很多的封装。这里分享一下ES Client的HTTP Client创建代码部分:,可以看出ES Client用到了HttpClient的异步Client,我猜是用future实现同步返回响应结果,这个没仔细看,有错请指出。这里也回答我的自己的一个疑惑,ES Client是支持并发的。,就我自己的观察,ES Client的封装程度非常高,完全可以拿来就用。我担心自己过几天之后就不知道改怎么用这些ES Client 的API了,所以又进行了一次封装,权当是一个学习笔记类。,封装代码有点多,放到了文末。,这个可以用来跑一部分数据到ES里。,如果想测试添加、删除功能,只需要把
test闭包内容修改即可。,下面是搜索功能的性能测试用例:,

最近在工作中协助研发进行了ES优化,效果还是非常明显的,几乎翻倍。除了通过各种业务接口测试ES性能以外,还可以直接请求ES接口,绕过服务,这样应该数据回更加准确。所以,ES Client学起来。

首先,先准备了一个ES服务,这里就不多赘述了,大家自己在尝试的时候一定主意好ES Server和ES Client的版本要一致。 其次,新建项目,添加依赖。

搜一下,能搜到很多的ES学习资料,建议先去看看大厂出品的基础知识了解一下ES功能。然后就可以直接看ES的API了。 下面是ES官方的文档地址: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/java-rest-high-search.html

如果能能查看自己公司项目源码的小伙伴可以多研究研发的代码,能够更好结合业务理解ES API的使用。

这里说一下,很多ES查询功能都是通过HTTP请求完成的,GET请求,body传参,一开始还是比较懵逼的。查了一些资料需要自己实现是个body携带数据的HTTPGET请求,下面是我的实现代码:

package com.funtester.httpclient

import org.apache.http.client.methods.HttpEntityEnclosingRequestBase

import javax.annotation.concurrent.NotThreadSafe

/**
 * HttpGet请求携带body参数
 */
@NotThreadSafe
class HttpGetByBody extends HttpEntityEnclosingRequestBase {

 static final String METHOD_NAME = "GET";

 /**
 * 获取方法(必须重载)
 *
 * @return
 */
 @Override
 String getMethod() {
 return METHOD_NAME;
 }

 /**
 * PS:不能照抄{@link org.apache.http.client.methods.HttpPost}
 * @param uri
 */
 HttpGetByBody(final String uri) {
 this(new URI(uri))
 }

 HttpGetByBody(final URI uri) {
 super();
 setURI(uri);
 }

 HttpGetByBody() {
 super();
 }
}

如果使用HTTP接口进行ES操作,需要组合多层级的参数,这个写起来会比较麻烦、可读性也比较差,而且更加容易出错。所以,还是使用ES Client作为操作ES的基础框架。

如果翻看ES Client源码,最终也是通过HttpClient发起HTTP请求的,这中间进行了很多的封装。这里分享一下ES Client的HTTP Client创建代码部分:

 private CloseableHttpAsyncClient createHttpClient() {
 //default timeouts are all infinite
 RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
 .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
 .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
 if (requestConfigCallback != null) {
 requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
 }

 try {
 HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
 //default settings for connection pooling may be too constraining
 .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
 .setSSLContext(SSLContext.getDefault())
 .setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
 if (httpClientConfigCallback != null) {
 httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
 }

 final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
 return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
 @Override
 public CloseableHttpAsyncClient run() {
 return finalBuilder.build();
 }
 });
 } catch (NoSuchAlgorithmException e) {
 throw new IllegalStateException("could not create the default ssl context", e);
 }
 }

可以看出ES Client用到了HttpClient的异步Client,我猜是用future实现同步返回响应结果,这个没仔细看,有错请指出。这里也回答我的自己的一个疑惑,ES Client是支持并发的。

就我自己的观察,ES Client的封装程度非常高,完全可以拿来就用。我担心自己过几天之后就不知道改怎么用这些ES Client 的API了,所以又进行了一次封装,权当是一个学习笔记类。

封装代码有点多,放到了文末。

这个可以用来跑一部分数据到ES里。

package com.funtest.groovytest

import com.alibaba.fastjson.JSONObject
import com.funtester.es.ESClient
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent

import java.util.concurrent.atomic.AtomicInteger

class ESC extends SourceCode {

 static void main(String[] args) {
 def client = new ESClient("127.0.0.1", 9200, "http")
 def data = new JSONObject()
 data.name = "FunTester"
 data.age = getRandomInt(100)
 def index = new AtomicInteger(0)
 def test = {
 data.put("time", index.getAndIncrement())
 client.index("fun", "tt", data)
 }
 new FunQpsConcurrent(test, "ES添加数据").start()
 }

}

如果想测试添加、删除功能,只需要把test闭包内容修改即可。

 def test = {
 data.put("time", index.getAndIncrement())
 client.delete("fun", "tt", client.index("fun", "tt", data))
 }

下面是搜索功能的性能测试用例:

package com.funtest.groovytest

import com.alibaba.fastjson.JSONObject
import com.funtester.es.ESClient
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.elasticsearch.index.query.QueryBuilders

import java.util.concurrent.atomic.AtomicInteger

class ESC extends SourceCode {

 static void main(String[] args) {
 def client = new ESClient("127.0.0.1", 9200, "http")
 def data = new JSONObject()
 data.name = "FunTester"
 data.age = getRandomInt(100)
 def index = new AtomicInteger(0)
 def test = {
 client.search("fun", QueryBuilders.matchQuery("time", getRandomInt(10)))
 }
 new FunQpsConcurrent(test, "ES搜索").start()
 }

}

package com.funtester.es

import com.funtester.frame.SourceCode
import groovy.util.logging.Log4j2
import org.apache.http.HttpHost
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.search.SearchScrollRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilder
import org.elasticsearch.search.SearchHits
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.search.fetch.subphase.FetchSourceContext

import java.util.concurrent.TimeUnit

/**
 * ES客户端API练习类
 */
@Log4j2
class ESClient extends SourceCode {

 String host

 int port

 String scheme

 RestHighLevelClient client

 ESClient(String host, int port = 9200, String scheme = "http") {
 this.host = host
 this.port = port
 this.scheme = scheme
 // 设置验证信息,填写账号及密码
 // CredentialsProvider credentialsProvider = new BasicCredentialsProvider()
 // credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "passwd"))
 def builder = RestClient.builder(new HttpHost(host, port, scheme))
 // 设置认证信息
 // builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
 //
 // @Override
 // public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
 // return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
 // }
 // })
 builder.setMaxRetryTimeoutMillis(1000)
 client = new RestHighLevelClient(builder)
 }

 /**
 * 添加数据
 * @param index
 * @param type
 * @param data
 * @return
 */
 def index(String index, type, Map data) {
 IndexRequest indexRequest = new IndexRequest(index, type).source(data)
 IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT)
 indexResponse.getId()
 }

 /**
 * 获取数据
 * @param index
 * @param type
 * @param id
 * @return
 */
 def get(String index, type, id) {
 // 查询文档
 GetRequest getRequest = new GetRequest(index, type, id)
 GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT)
 if (getResponse.isExists()) {
 getResponse.getSourceAsString()
 }
 }

 /**
 * 数据是否存在
 * @param index
 * @param type
 * @param id
 * @return
 */
 def exists(String index, type, id) {
 GetRequest getRequest = new GetRequest(index, type, id)
 getRequest.fetchSourceContext(new FetchSourceContext(false))
 getRequest.storedFields("_none_")
 client.exists(getRequest, RequestOptions.DEFAULT)
 }

 /**
 * 删除数据
 * @param index
 * @param type
 * @param id
 * @return
 */
 def delete(String index, type, id) {
 DeleteRequest deleteRequest = new DeleteRequest(index, type, id)
 client.delete(deleteRequest, RequestOptions.DEFAULT)
 }

 /**
 * 搜索数据
 * @param index
 * @param query
 * @param size
 * @return
 */
 def search(String index, QueryBuilder query, int size = 10) {
 SearchRequest searchRequest = new SearchRequest(index)
 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
 sourceBuilder.query(query)
 sourceBuilder.from(0)
 sourceBuilder.size(size)
 sourceBuilder.timeout(new TimeValue(1, TimeUnit.SECONDS))
 searchRequest.source(sourceBuilder)
 client.search(searchRequest, RequestOptions.DEFAULT)
 }

 /**
 * 滚动搜索
 * @param index
 * @param query
 * @param size
 */
 def searchScroll(String index, QueryBuilder query, int size = 10) {
 SearchRequest searchRequest = new SearchRequest(index)
 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
 searchSourceBuilder.query(query)
 searchSourceBuilder.size(size)
 searchRequest.source(searchSourceBuilder)
 searchRequest.scroll(TimeValue.timeValueMinutes(1L))
 SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
 String scrollId = searchResponse.getScrollId()
 SearchHits hits = searchResponse.getHits()
 def searchHits = hits.getHits()
 while (searchHits != null && searchHits.length > 0) {
 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
 scrollRequest.scroll(TimeValue.timeValueMinutes(1L))
 searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT)
 scrollId = searchResponse.getScrollId()
 searchHits = searchResponse.getHits().getHits()
 }

 }

 def close() {
 client.close()
 }
}

© 版权声明

相关文章