-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathIndexManagementRestTestCase.kt
313 lines (283 loc) · 15.1 KB
/
IndexManagementRestTestCase.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.indexmanagement
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.RequestOptions
import org.opensearch.client.WarningsHandler
import org.opensearch.client.ResponseException
import org.opensearch.common.Strings
import org.opensearch.common.collect.Set
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.core.xcontent.DeprecationHandler
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
val configSchemaVersion = 17
val historySchemaVersion = 5
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
// they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the
// jobs themselves to finish and gracefully shut them down.. but for now seeing if this works.
@Before
fun setAutoCreateIndex() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity("""{"persistent":{"action.auto_create_index":"-.opendistro-*,*"}}""", ContentType.APPLICATION_JSON)
)
}
// Tests on lower resource machines are experiencing flaky failures due to attempting to force a job to
// start before the job scheduler has registered the index operations listener. Initializing the index
// preemptively seems to give the job scheduler time to listen to operations.
@Before
fun initializeManagedIndex() {
if (!indexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) {
val request = Request("PUT", "/${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}")
var entity = "{\"settings\": " + Strings.toString(XContentType.JSON, Settings.builder().put(INDEX_HIDDEN, true).build())
entity += ",\"mappings\" : ${IndexManagementIndices.indexManagementMappings}}"
request.setJsonEntity(entity)
client().performRequest(request)
}
}
protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
return System.getProperty("tests.clustername")
}
fun Response.asMap(): Map<String, Any> = entityAsMap(this)
protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode)
protected fun assertIndexExists(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus())
}
protected fun assertIndexDoesNotExist(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does exist.", RestStatus.NOT_FOUND, response.restStatus())
}
protected fun verifyIndexSchemaVersion(index: String, expectedVersion: Int) {
val indexMapping = client().getIndexMapping(index)
val indexName = indexMapping.keys.toList()[0]
val mappings = indexMapping.stringMap(indexName)?.stringMap("mappings")
var version = 0
if (mappings!!.containsKey("_meta")) {
val meta = mappings.stringMap("_meta")
if (meta!!.containsKey("schema_version")) version = meta.get("schema_version") as Int
}
assertEquals(expectedVersion, version)
}
@Suppress("UNCHECKED_CAST")
fun Map<String, Any>.stringMap(key: String): Map<String, Any>? {
val map = this as Map<String, Map<String, Any>>
return map[key]
}
fun RestClient.getIndexMapping(index: String): Map<String, Any> {
val response = this.makeRequest("GET", "$index/_mapping")
assertEquals(RestStatus.OK, response.restStatus())
return response.asMap()
}
/**
* Inserts [docCount] sample documents into [index], optionally waiting [delay] milliseconds
* in between each insertion
*/
protected fun insertSampleData(index: String, docCount: Int, delay: Long = 0, jsonString: String = "{ \"test_field\": \"test_value\" }", routing: String? = null) {
var endpoint = "/$index/_doc/?refresh=true"
if (routing != null) endpoint += "&routing=$routing"
repeat(docCount) {
val request = Request("POST", endpoint)
request.setJsonEntity(jsonString)
client().performRequest(request)
Thread.sleep(delay)
}
}
protected fun insertSampleBulkData(index: String, bulkJsonString: String) {
val request = Request("POST", "/$index/_bulk/?refresh=true")
request.setJsonEntity(bulkJsonString)
request.options = RequestOptions.DEFAULT.toBuilder().addHeader("content-type", "application/x-ndjson").build()
val res = client().performRequest(request)
assertEquals(RestStatus.OK, res.restStatus())
}
/**
* Indexes 5k documents of the open NYC taxi dataset
*
* Example headers and document values
* VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
* 1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
*/
protected fun generateNYCTaxiData(index: String = "nyc-taxi-data") {
createIndex(index, Settings.EMPTY, """"properties":{"DOLocationID":{"type":"integer"},"RatecodeID":{"type":"integer"},"fare_amount":{"type":"float"},"tpep_dropoff_datetime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},"congestion_surcharge":{"type":"float"},"VendorID":{"type":"integer"},"passenger_count":{"type":"integer"},"tolls_amount":{"type":"float"},"improvement_surcharge":{"type":"float"},"trip_distance":{"type":"float"},"store_and_fwd_flag":{"type":"keyword"},"payment_type":{"type":"integer"},"total_amount":{"type":"float"},"extra":{"type":"float"},"tip_amount":{"type":"float"},"mta_tax":{"type":"float"},"tpep_pickup_datetime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},"PULocationID":{"type":"integer"}}""")
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
}
@Suppress("UNCHECKED_CAST")
protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
assertNotNull(shards)
val failures = shards["failures"]
assertNotNull(failures)
return failures?.let {
val result: ArrayList<Map<String, String>?>? = ArrayList()
for (failure in it) {
result?.add((failure as Map<String, Map<String, String>>)["reason"])
}
return result
}
}
override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or
// that doesn't support data streams so it's safe to ignore
val statusCode = e.response.statusLine.statusCode
if (!Set.of(404, 405, 500).contains(statusCode)) {
throw e
}
}
val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))
val xContentType = XContentType.fromMediaType(response.entity.contentType)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val indexName: String = jsonObject["index"] as String
// .opendistro_security isn't allowed to delete from cluster
if (!keepIndex.contains(indexName)) {
val request = Request("DELETE", "/$indexName")
// TODO: remove PERMISSIVE option after moving system index access to REST API call
val options = RequestOptions.DEFAULT.toBuilder()
options.setWarningsHandler(WarningsHandler.PERMISSIVE)
request.options = options.build()
client.performRequest(request)
}
}
}
waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}
@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}
@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}
@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))
val xContentType = XContentType.fromMediaType(response.entity.contentType)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}
internal interface IProxy {
val version: String?
var sessionId: String?
fun getExecutionData(reset: Boolean): ByteArray?
fun dump(reset: Boolean)
fun reset()
}
/*
* We need to be able to dump the jacoco coverage before the cluster shuts down.
* The new internal testing framework removed some gradle tasks we were listening to,
* to choose a good time to do it. This will dump the executionData to file after each test.
* TODO: This is also currently just overwriting integTest.exec with the updated execData without
* resetting after writing each time. This can be improved to either write an exec file per test
* or by letting jacoco append to the file.
* */
@JvmStatic
@AfterClass
fun dumpCoverage() {
// jacoco.dir set in esplugin-coverage.gradle, if it doesn't exist we don't
// want to collect coverage, so we can return early
val jacocoBuildPath = System.getProperty("jacoco.dir") ?: return
val serverUrl = "service:jmx:rmi:///jndi/rmi://127.0.0.1:7777/jmxrmi"
JMXConnectorFactory.connect(JMXServiceURL(serverUrl)).use { connector ->
val proxy = MBeanServerInvocationHandler.newProxyInstance(
connector.mBeanServerConnection,
ObjectName("org.jacoco:type=Runtime"),
IProxy::class.java,
false
)
proxy.getExecutionData(false)?.let {
val path = PathUtils.get("$jacocoBuildPath/integTest.exec")
Files.write(path, it)
}
}
}
}
}