Allow integration tests to be run from IDE

This commit is contained in:
Seth Michael Larson
2021-07-13 19:57:34 -05:00
committed by Seth Michael Larson
parent 45616bd862
commit 82b3daf6aa
10 changed files with 251 additions and 74 deletions
+4 -1
View File
@@ -7,7 +7,7 @@
# Export the TEST_SUITE variable, eg. 'free' or 'platinum' defaults to 'free'.
# Export the NUMBER_OF_NODES variable to start more than 1 node
# Version 1.3.0
# Version 1.4.0
# - Initial version of the run-elasticsearch.sh script
# - Deleting the volume should not dependent on the container still running
# - Fixed `ES_JAVA_OPTS` config
@@ -17,6 +17,7 @@
# - Added 5 retries on docker pull for fixing transient network errors
# - Added flags to make local CCR configurations work
# - Added action.destructive_requires_name=false as the default will be true in v8
# - Added ingest.geoip.downloader.enabled=false as it causes false positives in testing
script_path=$(dirname $(realpath -s $0))
source $script_path/functions/imports.sh
@@ -40,6 +41,7 @@ environment=($(cat <<-END
--env path.repo=/tmp
--env repositories.url.allowed_urls=http://snapshot.test*
--env action.destructive_requires_name=false
--env ingest.geoip.downloader.enabled=false
END
))
if [[ "$TEST_SUITE" == "platinum" ]]; then
@@ -48,6 +50,7 @@ if [[ "$TEST_SUITE" == "platinum" ]]; then
--env xpack.license.self_generated.type=trial
--env xpack.security.enabled=true
--env xpack.security.http.ssl.enabled=true
--env xpack.security.http.ssl.verification_mode=certificate
--env xpack.security.http.ssl.key=certs/testnode.key
--env xpack.security.http.ssl.certificate=certs/testnode.crt
--env xpack.security.http.ssl.certificate_authorities=certs/ca.crt
+2 -2
View File
@@ -194,7 +194,7 @@ async def async_streaming_bulk(
raise_on_error,
ignore_status,
*args,
**kwargs
**kwargs,
),
):
@@ -462,5 +462,5 @@ async def async_reindex(
target_client,
_change_doc_index(docs, target_index),
chunk_size=chunk_size,
**kwargs
**kwargs,
)
+16 -10
View File
@@ -19,6 +19,7 @@
import os
import time
from os.path import abspath, dirname, join
from unittest import SkipTest, TestCase
from elasticsearch import Elasticsearch
@@ -29,10 +30,12 @@ if "ELASTICSEARCH_URL" in os.environ:
else:
ELASTICSEARCH_URL = "https://elastic:changeme@localhost:9200"
CA_CERTS = join(dirname(dirname(dirname(abspath(__file__)))), ".ci/certs/ca.pem")
def get_test_client(nowait=False, **kwargs):
# construct kwargs from the environment
kw = {"timeout": 30, "ca_certs": ".ci/certs/ca.pem"}
kw = {"timeout": 30, "ca_certs": CA_CERTS}
if "PYTHON_CONNECTION_CLASS" in os.environ:
from elasticsearch import connection
@@ -56,13 +59,6 @@ def get_test_client(nowait=False, **kwargs):
raise SkipTest("Elasticsearch failed to start.")
def _get_version(version_string):
if "." not in version_string:
return ()
version = version_string.strip().split(".")
return tuple(int(v) if v.isdigit() else 999 for v in version)
class ElasticsearchTestCase(TestCase):
@staticmethod
def _get_client():
@@ -85,6 +81,16 @@ class ElasticsearchTestCase(TestCase):
def es_version(self):
if not hasattr(self, "_es_version"):
version_string = self.client.info()["version"]["number"]
self._es_version = _get_version(version_string)
self._es_version = es_version(self.client)
return self._es_version
def _get_version(version_string):
if "." not in version_string:
return ()
version = version_string.strip().split(".")
return tuple(int(v) if v.isdigit() else 999 for v in version)
def es_version(client):
return _get_version(client.info()["version"]["number"])
+1
View File
@@ -21,6 +21,7 @@ from unittest import TestCase
from ..client import Elasticsearch
ELASTICSEARCH_URL: str
CA_CERTS: str
def get_test_client(nowait: bool = ..., **kwargs: Any) -> Elasticsearch: ...
def _get_version(version_string: str) -> Tuple[int, ...]: ...
@@ -20,7 +20,7 @@ import asyncio
import pytest
import elasticsearch
from elasticsearch.helpers.test import ELASTICSEARCH_URL
from elasticsearch.helpers.test import CA_CERTS, ELASTICSEARCH_URL
from ...utils import wipe_cluster
@@ -34,7 +34,7 @@ async def async_client():
if not hasattr(elasticsearch, "AsyncElasticsearch"):
pytest.skip("test requires 'AsyncElasticsearch'")
kw = {"timeout": 3, "ca_certs": ".ci/certs/ca.pem"}
kw = {"timeout": 3, "ca_certs": CA_CERTS}
client = elasticsearch.AsyncElasticsearch(ELASTICSEARCH_URL, **kw)
# wait for yellow status
@@ -50,11 +50,26 @@ async def await_if_coro(x):
class AsyncYamlRunner(YamlRunner):
async def setup(self):
# Pull skips from individual tests to not do unnecessary setup.
skip_code = []
for action in self._run_code:
assert len(action) == 1
action_type, _ = list(action.items())[0]
if action_type == "skip":
skip_code.append(action)
else:
break
if self._setup_code or skip_code:
self.section("setup")
if skip_code:
await self.run_code(skip_code)
if self._setup_code:
await self.run_code(self._setup_code)
async def teardown(self):
if self._teardown_code:
self.section("teardown")
await self.run_code(self._teardown_code)
async def es_version(self):
@@ -67,19 +82,26 @@ class AsyncYamlRunner(YamlRunner):
ES_VERSION = tuple(int(v) if v.isdigit() else 999 for v in version)
return ES_VERSION
def section(self, name):
print(("=" * 10) + " " + name + " " + ("=" * 10))
async def run(self):
try:
await self.setup()
self.section("test")
await self.run_code(self._run_code)
finally:
await self.teardown()
try:
await self.teardown()
except Exception:
pass
async def run_code(self, test):
"""Execute an instruction based on it's type."""
print(test)
for action in test:
assert len(action) == 1
action_type, action = list(action.items())[0]
print(action_type, action)
if hasattr(self, "run_" + action_type):
await await_if_coro(getattr(self, "run_" + action_type)(action))
+11 -3
View File
@@ -21,7 +21,7 @@ import time
import pytest
import elasticsearch
from elasticsearch.helpers.test import ELASTICSEARCH_URL
from elasticsearch.helpers.test import CA_CERTS, ELASTICSEARCH_URL
from ..utils import wipe_cluster
@@ -38,7 +38,11 @@ def sync_client_factory():
try:
# Configure the client with certificates and optionally
# an HTTP conn class depending on 'PYTHON_CONNECTION_CLASS' envvar
kw = {"timeout": 3, "ca_certs": ".ci/certs/ca.pem"}
kw = {
"timeout": 3,
"ca_certs": CA_CERTS,
"headers": {"Authorization": "Basic ZWxhc3RpYzpjaGFuZ2VtZQ=="},
}
if "PYTHON_CONNECTION_CLASS" in os.environ:
from elasticsearch import connection
@@ -46,7 +50,11 @@ def sync_client_factory():
connection, os.environ["PYTHON_CONNECTION_CLASS"]
)
client = elasticsearch.Elasticsearch(ELASTICSEARCH_URL, **kw)
# We do this little dance with the URL to force
# Requests to respect 'headers: None' within rest API spec tests.
client = elasticsearch.Elasticsearch(
ELASTICSEARCH_URL.replace("elastic:changeme@", ""), **kw
)
# Wait for the cluster to report a status of 'yellow'
for _ in range(100):
@@ -62,35 +62,51 @@ IMPLEMENTED_FEATURES = {
# broken YAML tests on some releases
SKIP_TESTS = {
"ml/job_cat_apis[0]",
"ml/post_data[0]",
"ml/post_data[1]",
"ml/post_data[2]",
"ml/post_data[3]",
"ml/post_data[4]",
"ml/post_data[5]",
"ml/post_data[6]",
"ml/get_trained_model_stats[1]",
"ml/get_trained_model_stats[2]",
"ml/get_trained_model_stats[3]",
"ml/set_upgrade_mode[0]",
"ml/set_upgrade_mode[1]",
"ml/set_upgrade_mode[2]",
"ml/set_upgrade_mode[3]",
"ml/jobs_get_stats[0]",
"ml/jobs_get_stats[1]",
"ml/jobs_get_stats[2]",
"ml/jobs_get_stats[3]",
"ml/jobs_get_stats[4]",
"ml/jobs_get_stats[5]",
"ml/jobs_get_stats[6]",
"ml/jobs_get_stats[7]",
"ml/jobs_get_stats[8]",
"ml/jobs_get_stats[9]",
"ml/jobs_get_stats[10]",
"service_accounts/10_basic[0]",
# Warning about date_histogram.interval deprecation is raised randomly
"search/aggregation/250_moving_fn[1]",
# body: null
"indices/simulate_index_template/10_basic[2]",
# No ML node with sufficient capacity / random ML failing
"ml/start_stop_datafeed",
"ml/post_data",
"ml/jobs_crud",
"ml/datafeeds_crud",
"ml/set_upgrade_mode",
"ml/reset_job[2]",
"ml/jobs_get_stats",
"ml/get_datafeed_stats",
"ml/get_trained_model_stats",
"ml/delete_job_force",
"ml/jobs_get_result_overall_buckets",
"ml/bucket_correlation_agg[0]",
"ml/job_groups",
"transform/transforms_stats_continuous[0]",
# Fails bad request instead of 404?
"ml/inference_crud",
# rollup/security_tests time out?
"rollup/security_tests",
# Our TLS certs are custom
"ssl/10_basic[0]",
# Our user is custom
"users/10_basic[3]",
# Shards/snapshots aren't right?
"searchable_snapshots/10_usage[1]",
# flaky data streams?
"data_stream/10_basic[1]",
"data_stream/80_resolve_index_data_streams[1]",
# bad formatting?
"cat/allocation/10_basic",
# service account number not right?
"service_accounts/10_basic[1]",
"snapshot/20_operator_privileges_disabled[0]",
# doesn't use 'contains' properly?
"xpack/10_basic[0]",
"privileges/40_get_user_privs[0]",
"privileges/40_get_user_privs[1]",
# bad use of 'is_false'?
"indices/get_alias/10_basic[22]",
# unique usage of 'set'
"indices/stats/50_disk_usage[0]",
"indices/stats/60_field_usage[0]",
}
@@ -101,6 +117,8 @@ RUN_ASYNC_REST_API_TESTS = (
and os.environ.get("PYTHON_CONNECTION_CLASS") == "RequestsHttpConnection"
)
FALSEY_VALUES = ("", None, False, 0, 0.0)
class YamlRunner:
def __init__(self, client):
@@ -118,11 +136,26 @@ class YamlRunner:
self._teardown_code = test_spec.pop("teardown", None)
def setup(self):
# Pull skips from individual tests to not do unnecessary setup.
skip_code = []
for action in self._run_code:
assert len(action) == 1
action_type, _ = list(action.items())[0]
if action_type == "skip":
skip_code.append(action)
else:
break
if self._setup_code or skip_code:
self.section("setup")
if skip_code:
self.run_code(skip_code)
if self._setup_code:
self.run_code(self._setup_code)
def teardown(self):
if self._teardown_code:
self.section("teardown")
self.run_code(self._teardown_code)
def es_version(self):
@@ -135,19 +168,26 @@ class YamlRunner:
ES_VERSION = tuple(int(v) if v.isdigit() else 999 for v in version)
return ES_VERSION
def section(self, name):
print(("=" * 10) + " " + name + " " + ("=" * 10))
def run(self):
try:
self.setup()
self.section("test")
self.run_code(self._run_code)
finally:
self.teardown()
try:
self.teardown()
except Exception:
pass
def run_code(self, test):
"""Execute an instruction based on it's type."""
print(test)
for action in test:
assert len(action) == 1
action_type, action = list(action.items())[0]
print(action_type, action)
if hasattr(self, "run_" + action_type):
getattr(self, "run_" + action_type)(action)
@@ -300,11 +340,11 @@ class YamlRunner:
except AssertionError:
pass
else:
assert value in ("", None, False, 0)
assert value in FALSEY_VALUES
def run_is_true(self, action):
value = self._lookup(action)
assert value not in ("", None, False, 0)
assert value not in FALSEY_VALUES
def run_length(self, action):
for path, expected in action.items():
@@ -328,7 +368,7 @@ class YamlRunner:
expected,
)
else:
assert expected == value, "%r does not match %r" % (value, expected)
self._assert_match_equals(value, expected)
def run_contains(self, action):
for path, expected in action.items():
@@ -352,12 +392,18 @@ class YamlRunner:
# resolve variables
if isinstance(value, string_types) and "$" in value:
for k, v in self._state.items():
key_construct = "${" + k + "}"
if key_construct in value:
value = value.replace(key_construct, v)
key_construct = "$" + k
if key_construct in value:
value = value.replace(key_construct, v)
for key_replace in ("${" + k + "}", "$" + k):
if value == key_replace:
value = v
break
# We only do the in-string replacement if using ${...}
elif (
key_replace.startswith("${")
and isinstance(value, string_types)
and key_replace in value
):
value = value.replace(key_replace, v)
break
if isinstance(value, string_types):
value = value.strip()
@@ -378,7 +424,12 @@ class YamlRunner:
continue
step = step.replace("\1", ".")
step = self._resolve(step)
if step.isdigit() and step not in value:
if (
isinstance(step, string_types)
and step.isdigit()
and isinstance(value, list)
):
step = int(step)
assert isinstance(value, list)
assert len(value) > step
@@ -403,6 +454,13 @@ class YamlRunner:
IMPLEMENTED_FEATURES.add("no_xpack")
return name in XPACK_FEATURES
def _assert_match_equals(self, a, b):
# Handle for large floating points with 'E'
if isinstance(b, string_types) and isinstance(a, float) and "e" in repr(a):
a = repr(a).replace("e+", "E")
assert a == b, "%r does not match %r" % (a, b)
@pytest.fixture(scope="function")
def sync_runner(sync_client):
@@ -419,13 +477,14 @@ try:
# Make a request to Elasticsearch for the build hash, we'll be looking for
# an artifact with this same hash to download test specs for.
build_hash = client.info()["version"]["build_hash"]
client_info = client.info()
version_number = client_info["version"]["number"]
build_hash = client_info["version"]["build_hash"]
# Now talk to the artifacts API with the 'STACK_VERSION' environment variable
resp = http.request(
"GET",
"https://artifacts-api.elastic.co/v1/versions/%s"
% (os.environ["STACK_VERSION"],),
"https://artifacts-api.elastic.co/v1/versions/%s" % (version_number,),
)
resp = json.loads(resp.data.decode("utf-8"))
@@ -493,7 +552,8 @@ try:
"run": test_step,
"teardown": teardown_steps,
}
if pytest_param_id in SKIP_TESTS:
# Skip either 'test_name' or 'test_name[x]'
if pytest_test_name in SKIP_TESTS or pytest_param_id in SKIP_TESTS:
pytest_param["skip"] = True
YAML_TEST_SPECS.append(pytest.param(pytest_param, id=pytest_param_id))
+20
View File
@@ -589,6 +589,26 @@ def test_verify_elasticsearch_passes(headers, response):
{},
'{"version":{"number":"6.99.0"},"tagline":"You Know, for Search"}',
),
(
{},
"""{
"name" : "io",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "HaMHUswUSGGnzla8B17Iqw",
"version" : {
"number" : "7.6.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "7f634e9f44834fbc12724506cc1da681b0c3b1e3",
"build_date" : "2020-02-06T00:09:00.449973Z",
"build_snapshot" : false,
"lucene_version" : "8.4.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}""",
),
(
{},
'{"version":{"number":"7.13.0","build_flavor":"default"},"tagline":"You Know, for Search"}',
+67 -10
View File
@@ -18,6 +18,7 @@
import time
from elasticsearch import Elasticsearch, NotFoundError, RequestError
from elasticsearch.helpers.test import es_version
def wipe_cluster(client):
@@ -40,6 +41,10 @@ def wipe_cluster(client):
wait_for_pending_tasks(client, filter="xpack/rollup/job")
wipe_slm_policies(client)
# Searchable snapshot indices start in 7.8+
if es_version(client) >= (7, 8):
wipe_searchable_snapshot_indices(client)
wipe_snapshots(client)
if is_xpack:
wipe_data_streams(client)
@@ -58,6 +63,8 @@ def wipe_cluster(client):
wipe_ilm_policies(client)
wipe_auto_follow_patterns(client)
wipe_tasks(client)
wipe_node_shutdown_metadata(client)
wait_for_pending_datafeeds_and_jobs(client)
wait_for_cluster_state_updates_to_finish(client)
if close_after_wipe:
@@ -86,20 +93,34 @@ def wipe_rollup_jobs(client):
def wipe_snapshots(client):
"""Deletes all the snapshots and repositories from the cluster"""
repos = client.snapshot.get_repository()
for name, repo in repos.items():
if repo["type"] == "fs":
client.snapshot.delete(
repository=name,
snapshot="*",
ignore=404,
)
in_progress_snapshots = []
client.snapshot.delete_repository(repository=name, ignore=404)
repos = client.snapshot.get_repository(repository="_all")
for repo_name, repo in repos.items():
if repo["type"] == "fs":
snapshots = client.snapshot.get(
repository=repo_name, snapshot="_all", ignore_unavailable=True
)
for snapshot in snapshots["snapshots"]:
if snapshot["state"] == "IN_PROGRESS":
in_progress_snapshots.append(snapshot)
else:
client.snapshot.delete(
repository=repo_name,
snapshot=snapshot["snapshot"],
ignore=404,
)
client.snapshot.delete_repository(repository=repo_name, ignore=404)
assert in_progress_snapshots == []
def wipe_data_streams(client):
client.indices.delete_data_stream(name="*")
try:
client.indices.delete_data_stream(name="*", expand_wildcards="all")
except Exception:
client.indices.delete_data_stream(name="*")
def wipe_indices(client):
@@ -110,6 +131,16 @@ def wipe_indices(client):
)
def wipe_searchable_snapshot_indices(client):
cluster_metadata = client.cluster.state(
metric="metadata",
filter_path="metadata.indices.*.settings.index.store.snapshot",
)
if cluster_metadata:
for index in cluster_metadata["metadata"]["indices"].keys():
client.indices.delete(index=index)
def wipe_xpack_templates(client):
templates = [
x.strip() for x in client.cat.templates(h="name").split("\n") if x.strip()
@@ -168,6 +199,18 @@ def wipe_auto_follow_patterns(client):
client.ccr.delete_auto_follow_pattern(name=pattern["name"])
def wipe_node_shutdown_metadata(client):
shutdown_status = client.shutdown.get_node()
# If response contains these two keys the feature flag isn't enabled
# on this cluster so skip this step now.
if "_nodes" in shutdown_status and "cluster_name" in shutdown_status:
return
for shutdown_node in shutdown_status.get("nodes", []):
node_id = shutdown_node["node_id"]
client.shutdown.delete_node(node_id=node_id)
def wipe_tasks(client):
tasks = client.tasks.list()
for node_name, node in tasks.get("node", {}).items():
@@ -183,6 +226,19 @@ def wait_for_pending_tasks(client, filter, timeout=30):
break
def wait_for_pending_datafeeds_and_jobs(client, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
if (
client.ml.get_datafeeds(datafeed_id="*", allow_no_datafeeds=True)["count"]
== 0
):
break
while time.time() < end_time:
if client.ml.get_jobs(job_id="*", allow_no_jobs=True)["count"] == 0:
break
def wait_for_cluster_state_updates_to_finish(client, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
@@ -221,6 +277,7 @@ def is_xpack_template(name):
"synthetics-settings",
"synthetics-mappings",
".snapshot-blob-cache",
"data-streams-mappings",
}:
return True
return False