11package io .github .delirius325 .jmeter .backendlistener .elasticsearch ;
22
3+
34import com .google .gson .Gson ;
45import org .apache .http .HttpHost ;
56import org .apache .jmeter .config .Arguments ;
1112import org .elasticsearch .client .RestClient ;
1213import org .slf4j .Logger ;
1314import org .slf4j .LoggerFactory ;
15+ import com .amazonaws .auth .AWSCredentialsProvider ;
16+ import com .amazonaws .auth .AWS4Signer ;
17+ import com .amazonaws .http .AWSRequestSigningApacheInterceptor ;
18+ import com .amazonaws .auth .DefaultAWSCredentialsProviderChain ;
19+ import org .apache .http .HttpRequestInterceptor ;
1420
1521import java .util .*;
1622
@@ -29,8 +35,13 @@ public class ElasticsearchBackendClient extends AbstractBackendListenerClient {
2935 private static final String ES_AUTH_PWD = "es.xpack.password" ;
3036 private static final String ES_PARSE_REQ_HEADERS = "es.parse.all.req.headers" ;
3137 private static final String ES_PARSE_RES_HEADERS = "es.parse.all.res.headers" ;
38+ private static final String ES_AWS_ENDPOINT = "es.aws.endpoint" ;
39+ private static final String ES_AWS_REGION = "es.aws.region" ;
3240 private static final long DEFAULT_TIMEOUT_MS = 200L ;
41+ private static final String SERVICE_NAME = "es" ;
42+ private static RestClient client ;
3343 private static final Logger logger = LoggerFactory .getLogger (ElasticsearchBackendClient .class );
44+ private static final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain ();
3445
3546 private ElasticSearchMetricSender sender ;
3647 private Set <String > modes ;
@@ -55,6 +66,8 @@ public Arguments getDefaultParameters() {
5566 parameters .addArgument (ES_AUTH_PWD , "" );
5667 parameters .addArgument (ES_PARSE_REQ_HEADERS , "false" );
5768 parameters .addArgument (ES_PARSE_RES_HEADERS , "false" );
69+ parameters .addArgument (ES_AWS_ENDPOINT , "" );
70+ parameters .addArgument (ES_AWS_REGION , "" );
5871 return parameters ;
5972 }
6073
@@ -66,19 +79,28 @@ public void setupTest(BackendListenerContext context) throws Exception {
6679 this .bulkSize = Integer .parseInt (context .getParameter (ES_BULK_SIZE ));
6780 this .timeoutMs = JMeterUtils .getPropDefault (ES_TIMEOUT_MS , DEFAULT_TIMEOUT_MS );
6881 this .buildNumber = (JMeterUtils .getProperty (ElasticsearchBackendClient .BUILD_NUMBER ) != null && !JMeterUtils .getProperty (ElasticsearchBackendClient .BUILD_NUMBER ).trim ().equals ("" )) ? Integer .parseInt (JMeterUtils .getProperty (ElasticsearchBackendClient .BUILD_NUMBER )) : 0 ;
69- RestClient client = RestClient .builder (new HttpHost (context .getParameter (ES_HOST ), Integer .parseInt (context .getParameter (ES_PORT )), context .getParameter (ES_SCHEME )))
70- .setRequestConfigCallback (requestConfigBuilder -> requestConfigBuilder .setConnectTimeout (5000 )
71- .setSocketTimeout ((int ) timeoutMs ))
72- .setFailureListener (new RestClient .FailureListener () {
73- @ Override
74- public void onFailure (Node node ) {
75- throw new IllegalStateException ();
76- }
77- })
78- .setMaxRetryTimeoutMillis (60000 )
79- .build ();
80-
81- this .sender = new ElasticSearchMetricSender (client , context .getParameter (ES_INDEX ).toLowerCase () ,context .getParameter (ES_AUTH_USER ), context .getParameter (ES_AUTH_PWD ));
82+
83+ if (context .getParameter (ES_AWS_ENDPOINT ).equalsIgnoreCase ("" )) {
84+ client = RestClient .builder (new HttpHost (context .getParameter (ES_HOST ), Integer .parseInt (context .getParameter (ES_PORT )), context .getParameter (ES_SCHEME )))
85+ .setRequestConfigCallback (requestConfigBuilder -> requestConfigBuilder .setConnectTimeout (5000 )
86+ .setSocketTimeout ((int ) timeoutMs ))
87+ .setFailureListener (new RestClient .FailureListener () {
88+ @ Override
89+ public void onFailure (Node node ) {
90+ throw new IllegalStateException ();
91+ }
92+ })
93+ .setMaxRetryTimeoutMillis (60000 )
94+ .build ();
95+ } else {
96+
97+ AWS4Signer signer = new AWS4Signer ();
98+ signer .setServiceName (SERVICE_NAME );
99+ signer .setRegionName (context .getParameter (ES_AWS_REGION ));
100+ HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor (SERVICE_NAME , signer , credentialsProvider );
101+ client = RestClient .builder (HttpHost .create (context .getParameter (ES_AWS_ENDPOINT ))).setHttpClientConfigCallback (hacb -> hacb .addInterceptorLast (interceptor )).build ();
102+ }
103+ this .sender = new ElasticSearchMetricSender (client , context .getParameter (ES_INDEX ).toLowerCase (),context .getParameter (ES_AUTH_USER ), context .getParameter (ES_AUTH_PWD ), context .getParameter (ES_AWS_ENDPOINT ));
82104 this .sender .createIndex ();
83105
84106 checkTestMode (context .getParameter (ES_TEST_MODE ));
@@ -90,7 +112,6 @@ public void onFailure(Node node) {
90112 logger .info ("Added filter: " + filter .toLowerCase ().trim ());
91113 }
92114 }
93-
94115 super .setupTest (context );
95116 } catch (Exception e ) {
96117 throw new IllegalStateException ("Unable to connect to the ElasticSearch engine" , e );
0 commit comments