Remove left-over instances of 'es' or 'elasticsearch' word

Signed-off-by: Rushi Agrawal <rushi.agr@gmail.com>
This commit is contained in:
Rushi Agrawal
2021-08-19 14:09:22 +05:30
parent 497468f254
commit 340069f6fa
14 changed files with 117 additions and 105 deletions
+1 -1
View File
@@ -1,2 +1,2 @@
For a list of all our amazing authors please see the contributors page: For a list of all our amazing authors please see the contributors page:
https://github.com/elastic/elasticsearch-py/graphs/contributors https://github.com/opensearch-project/opensearch-py/graphs/contributors
+13 -13
View File
@@ -48,7 +48,7 @@ logger = logging.getLogger("opensearch")
class AsyncOpenSearch(object): class AsyncOpenSearch(object):
""" """
OpenSearch low-level client. Provides a straightforward mapping from OpenSearch low-level client. Provides a straightforward mapping from
Python to ES REST endpoints. Python to OpenSearch REST endpoints.
The instance has attributes ``cat``, ``cluster``, ``indices``, ``ingest``, The instance has attributes ``cat``, ``cluster``, ``indices``, ``ingest``,
``nodes``, ``snapshot`` and ``tasks`` that provide access to instances of ``nodes``, ``snapshot`` and ``tasks`` that provide access to instances of
@@ -66,16 +66,16 @@ class AsyncOpenSearch(object):
the ``connection_class`` parameter:: the ``connection_class`` parameter::
# create connection to localhost using the ThriftConnection # create connection to localhost using the ThriftConnection
es = OpenSearch(connection_class=ThriftConnection) client = OpenSearch(connection_class=ThriftConnection)
If you want to turn on :ref:`sniffing` you have several options (described If you want to turn on :ref:`sniffing` you have several options (described
in :class:`~opensearch.Transport`):: in :class:`~opensearch.Transport`)::
# create connection that will automatically inspect the cluster to get # create connection that will automatically inspect the cluster to get
# the list of active nodes. Start with nodes running on 'esnode1' and # the list of active nodes. Start with nodes running on
# 'esnode2' # 'opensearchnode1' and 'opensearchnode2'
es = OpenSearch( client = OpenSearch(
['esnode1', 'esnode2'], ['opensearchnode1', 'opensearchnode2'],
# sniff before doing anything # sniff before doing anything
sniff_on_start=True, sniff_on_start=True,
# refresh nodes after a node fails to respond # refresh nodes after a node fails to respond
@@ -89,16 +89,16 @@ class AsyncOpenSearch(object):
# connect to localhost directly and another node using SSL on port 443 # connect to localhost directly and another node using SSL on port 443
# and an url_prefix. Note that ``port`` needs to be an int. # and an url_prefix. Note that ``port`` needs to be an int.
es = OpenSearch([ client = OpenSearch([
{'host': 'localhost'}, {'host': 'localhost'},
{'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True}, {'host': 'othernode', 'port': 443, 'url_prefix': 'opensearch', 'use_ssl': True},
]) ])
If using SSL, there are several parameters that control how we deal with If using SSL, there are several parameters that control how we deal with
certificates (see :class:`~opensearch.Urllib3HttpConnection` for certificates (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -112,7 +112,7 @@ class AsyncOpenSearch(object):
optionally (see :class:`~opensearch.Urllib3HttpConnection` for optionally (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -126,7 +126,7 @@ class AsyncOpenSearch(object):
(see :class:`~opensearch.Urllib3HttpConnection` for (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -143,7 +143,7 @@ class AsyncOpenSearch(object):
Alternatively you can use RFC-1738 formatted URLs, as long as they are not Alternatively you can use RFC-1738 formatted URLs, as long as they are not
in conflict with other options:: in conflict with other options::
es = OpenSearch( client = OpenSearch(
[ [
'http://user:secret@localhost:9200/', 'http://user:secret@localhost:9200/',
'https://user:secret@other_host:443/production' 'https://user:secret@other_host:443/production'
@@ -166,7 +166,7 @@ class AsyncOpenSearch(object):
return 'CustomSomethingRepresentation' return 'CustomSomethingRepresentation'
return JSONSerializer.default(self, obj) return JSONSerializer.default(self, obj)
es = OpenSearch(serializer=SetEncoder()) client = OpenSearch(serializer=SetEncoder())
""" """
+3 -3
View File
@@ -158,7 +158,7 @@ async def async_streaming_bulk(
:arg client: instance of :class:`~opensearch.AsyncOpenSearch` to use :arg client: instance of :class:`~opensearch.AsyncOpenSearch` to use
:arg actions: iterable or async iterable containing the actions to be executed :arg actions: iterable or async iterable containing the actions to be executed
:arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg chunk_size: number of docs in one chunk sent to client (default: 500)
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
from the execution of the last chunk when some occur. By default we raise. from the execution of the last chunk when some occur. By default we raise.
@@ -334,7 +334,7 @@ async def async_scan(
Any additional keyword arguments will be passed to the initial Any additional keyword arguments will be passed to the initial
:meth:`~opensearch.AsyncOpenSearch.search` call:: :meth:`~opensearch.AsyncOpenSearch.search` call::
async_scan(es, async_scan(client,
query={"query": {"match": {"title": "python"}}}, query={"query": {"match": {"title": "python"}}},
index="orders-*", index="orders-*",
doc_type="books" doc_type="books"
@@ -444,7 +444,7 @@ async def async_reindex(
:arg query: body for the :meth:`~opensearch.AsyncOpenSearch.search` api :arg query: body for the :meth:`~opensearch.AsyncOpenSearch.search` api
:arg target_client: optional, is specified will be used for writing (thus :arg target_client: optional, is specified will be used for writing (thus
enabling reindex between clusters) enabling reindex between clusters)
:arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg chunk_size: number of docs in one chunk sent to client (default: 500)
:arg scroll: Specify how long a consistent view of the index should be :arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search maintained for scrolled search
:arg scan_kwargs: additional kwargs to be passed to :arg scan_kwargs: additional kwargs to be passed to
+2 -2
View File
@@ -371,14 +371,14 @@ class AIOHttpConnection(AsyncConnection):
auto_decompress=True, auto_decompress=True,
loop=self.loop, loop=self.loop,
cookie_jar=aiohttp.DummyCookieJar(), cookie_jar=aiohttp.DummyCookieJar(),
response_class=ESClientResponse, response_class=OpenSearchClientResponse,
connector=aiohttp.TCPConnector( connector=aiohttp.TCPConnector(
limit=self._limit, use_dns_cache=True, ssl=self._ssl_context limit=self._limit, use_dns_cache=True, ssl=self._ssl_context
), ),
) )
class ESClientResponse(aiohttp.ClientResponse): class OpenSearchClientResponse(aiohttp.ClientResponse):
async def text(self, encoding=None, errors="strict"): async def text(self, encoding=None, errors="strict"):
if self._body is None: if self._body is None:
await self.read() await self.read()
+13 -13
View File
@@ -48,7 +48,7 @@ logger = logging.getLogger("opensearch")
class OpenSearch(object): class OpenSearch(object):
""" """
OpenSearch low-level client. Provides a straightforward mapping from OpenSearch low-level client. Provides a straightforward mapping from
Python to ES REST endpoints. Python to OpenSearch REST endpoints.
The instance has attributes ``cat``, ``cluster``, ``indices``, ``ingest``, The instance has attributes ``cat``, ``cluster``, ``indices``, ``ingest``,
``nodes``, ``snapshot`` and ``tasks`` that provide access to instances of ``nodes``, ``snapshot`` and ``tasks`` that provide access to instances of
@@ -66,16 +66,16 @@ class OpenSearch(object):
the ``connection_class`` parameter:: the ``connection_class`` parameter::
# create connection to localhost using the ThriftConnection # create connection to localhost using the ThriftConnection
es = OpenSearch(connection_class=ThriftConnection) client = OpenSearch(connection_class=ThriftConnection)
If you want to turn on :ref:`sniffing` you have several options (described If you want to turn on :ref:`sniffing` you have several options (described
in :class:`~opensearch.Transport`):: in :class:`~opensearch.Transport`)::
# create connection that will automatically inspect the cluster to get # create connection that will automatically inspect the cluster to get
# the list of active nodes. Start with nodes running on 'esnode1' and # the list of active nodes. Start with nodes running on
# 'esnode2' # 'opensearchnode1' and 'opensearchnode2'
es = OpenSearch( client = OpenSearch(
['esnode1', 'esnode2'], ['opensearchnode1', 'opensearchnode2'],
# sniff before doing anything # sniff before doing anything
sniff_on_start=True, sniff_on_start=True,
# refresh nodes after a node fails to respond # refresh nodes after a node fails to respond
@@ -89,16 +89,16 @@ class OpenSearch(object):
# connect to localhost directly and another node using SSL on port 443 # connect to localhost directly and another node using SSL on port 443
# and an url_prefix. Note that ``port`` needs to be an int. # and an url_prefix. Note that ``port`` needs to be an int.
es = OpenSearch([ client = OpenSearch([
{'host': 'localhost'}, {'host': 'localhost'},
{'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True}, {'host': 'othernode', 'port': 443, 'url_prefix': 'opensearch', 'use_ssl': True},
]) ])
If using SSL, there are several parameters that control how we deal with If using SSL, there are several parameters that control how we deal with
certificates (see :class:`~opensearch.Urllib3HttpConnection` for certificates (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -112,7 +112,7 @@ class OpenSearch(object):
optionally (see :class:`~opensearch.Urllib3HttpConnection` for optionally (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -126,7 +126,7 @@ class OpenSearch(object):
(see :class:`~opensearch.Urllib3HttpConnection` for (see :class:`~opensearch.Urllib3HttpConnection` for
detailed description of the options):: detailed description of the options)::
es = OpenSearch( client = OpenSearch(
['localhost:443', 'other_host:443'], ['localhost:443', 'other_host:443'],
# turn on SSL # turn on SSL
use_ssl=True, use_ssl=True,
@@ -143,7 +143,7 @@ class OpenSearch(object):
Alternatively you can use RFC-1738 formatted URLs, as long as they are not Alternatively you can use RFC-1738 formatted URLs, as long as they are not
in conflict with other options:: in conflict with other options::
es = OpenSearch( client = OpenSearch(
[ [
'http://user:secret@localhost:9200/', 'http://user:secret@localhost:9200/',
'https://user:secret@other_host:443/production' 'https://user:secret@other_host:443/production'
@@ -166,7 +166,7 @@ class OpenSearch(object):
return 'CustomSomethingRepresentation' return 'CustomSomethingRepresentation'
return JSONSerializer.default(self, obj) return JSONSerializer.default(self, obj)
es = OpenSearch(serializer=SetEncoder()) client = OpenSearch(serializer=SetEncoder())
""" """
+3 -3
View File
@@ -62,7 +62,7 @@ class SerializationError(OpenSearchException):
class TransportError(OpenSearchException): class TransportError(OpenSearchException):
""" """
Exception raised when ES returns a non-OK (>=400) HTTP status code. Or when Exception raised when OpenSearch returns a non-OK (>=400) HTTP status code. Or when
an actual connection error happens; in that case the ``status_code`` will an actual connection error happens; in that case the ``status_code`` will
be set to ``'N/A'``. be set to ``'N/A'``.
""" """
@@ -83,7 +83,7 @@ class TransportError(OpenSearchException):
@property @property
def info(self): def info(self):
""" """
Dict of returned error info from ES, where available, underlying Dict of returned error info from OpenSearch, where available, underlying
exception when not. exception when not.
""" """
return self.args[2] return self.args[2]
@@ -115,7 +115,7 @@ class TransportError(OpenSearchException):
class ConnectionError(TransportError): class ConnectionError(TransportError):
""" """
Error raised when there was an exception while talking to ES. Original Error raised when there was an exception while talking to OpenSearch. Original
exception from the underlying :class:`~opensearch.Connection` exception from the underlying :class:`~opensearch.Connection`
implementation is available as ``.info``. implementation is available as ``.info``.
""" """
+4 -4
View File
@@ -296,7 +296,7 @@ def streaming_bulk(
:arg client: instance of :class:`~opensearch.OpenSearch` to use :arg client: instance of :class:`~opensearch.OpenSearch` to use
:arg actions: iterable containing the actions to be executed :arg actions: iterable containing the actions to be executed
:arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg chunk_size: number of docs in one chunk sent to client (default: 500)
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
from the execution of the last chunk when some occur. By default we raise. from the execution of the last chunk when some occur. By default we raise.
@@ -439,7 +439,7 @@ def parallel_bulk(
:arg client: instance of :class:`~opensearch.OpenSearch` to use :arg client: instance of :class:`~opensearch.OpenSearch` to use
:arg actions: iterator containing the actions :arg actions: iterator containing the actions
:arg thread_count: size of the threadpool to use for the bulk requests :arg thread_count: size of the threadpool to use for the bulk requests
:arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg chunk_size: number of docs in one chunk sent to client (default: 500)
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
from the execution of the last chunk when some occur. By default we raise. from the execution of the last chunk when some occur. By default we raise.
@@ -536,7 +536,7 @@ def scan(
Any additional keyword arguments will be passed to the initial Any additional keyword arguments will be passed to the initial
:meth:`~opensearch.OpenSearch.search` call:: :meth:`~opensearch.OpenSearch.search` call::
scan(es, scan(client,
query={"query": {"match": {"title": "python"}}}, query={"query": {"match": {"title": "python"}}},
index="orders-*", index="orders-*",
doc_type="books" doc_type="books"
@@ -644,7 +644,7 @@ def reindex(
:arg query: body for the :meth:`~opensearch.OpenSearch.search` api :arg query: body for the :meth:`~opensearch.OpenSearch.search` api
:arg target_client: optional, is specified will be used for writing (thus :arg target_client: optional, is specified will be used for writing (thus
enabling reindex between clusters) enabling reindex between clusters)
:arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg chunk_size: number of docs in one chunk sent to client (default: 500)
:arg scroll: Specify how long a consistent view of the index should be :arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search maintained for scrolled search
:arg scan_kwargs: additional kwargs to be passed to :arg scan_kwargs: additional kwargs to be passed to
+1 -1
View File
@@ -78,7 +78,7 @@ class OpenSearchTestCase(TestCase):
cls.client = cls._get_client() cls.client = cls._get_client()
def teardown_method(self, _): def teardown_method(self, _):
# Hidden indices expanded in wildcards in ES 7.7 # Hidden indices expanded in wildcards in OpenSearch 7.7
expand_wildcards = ["open", "closed"] expand_wildcards = ["open", "closed"]
if self.opensearch_version() >= (7, 7): if self.opensearch_version() >= (7, 7):
expand_wildcards.append("hidden") expand_wildcards.append("hidden")
+3 -3
View File
@@ -62,10 +62,10 @@ def fetch_opensearch_repo():
from test_opensearch.test_cases import SkipTest from test_opensearch.test_cases import SkipTest
from test_opensearch.test_server import get_client from test_opensearch.test_server import get_client
# find out the sha of the running es # find out the sha of the running client
try: try:
es = get_client() client = get_client()
sha = es.info()["version"]["build_hash"] sha = client.info()["version"]["build_hash"]
except (SkipTest, KeyError): except (SkipTest, KeyError):
print("No running opensearch >1.X server...") print("No running opensearch >1.X server...")
return return
+5 -5
View File
@@ -121,13 +121,13 @@ class TestClient(OpenSearchTestCase):
self.assertEqual("<OtherOpenSearch([{}])>", repr(OtherOpenSearch())) self.assertEqual("<OtherOpenSearch([{}])>", repr(OtherOpenSearch()))
def test_repr_contains_hosts_passed_in(self): def test_repr_contains_hosts_passed_in(self):
self.assertIn("es.org", repr(OpenSearch(["es.org:123"]))) self.assertIn("opensearch.org", repr(OpenSearch(["opensearch.org:123"])))
def test_repr_truncates_host_to_5(self): def test_repr_truncates_host_to_5(self):
hosts = [{"host": "es" + str(i)} for i in range(10)] hosts = [{"host": "opensearch" + str(i)} for i in range(10)]
es = OpenSearch(hosts) client = OpenSearch(hosts)
self.assertNotIn("es5", repr(es)) self.assertNotIn("opensearch5", repr(client))
self.assertIn("...", repr(es)) self.assertIn("...", repr(client))
def test_index_uses_post_if_id_is_empty(self): def test_index_uses_post_if_id_is_empty(self):
self.client.index(index="my-index", id="", body={}) self.client.index(index="my-index", id="", body={})
+1 -1
View File
@@ -1,6 +1,6 @@
# Type Hints # Type Hints
All of these scripts are used to test the type hinting All of these scripts are used to test the type hinting
distributed with the `elasticsearch` package. distributed with the `opensearch` package.
These scripts simulate normal usage of the client and are run These scripts simulate normal usage of the client and are run
through `mypy --strict` as a part of continuous integration. through `mypy --strict` as a part of continuous integration.
+34 -28
View File
@@ -46,7 +46,7 @@ from opensearch1.helpers import (
streaming_bulk, streaming_bulk,
) )
es = OpenSearch( client = OpenSearch(
[{"host": "localhost", "port": 9443}], [{"host": "localhost", "port": 9443}],
transport_class=Transport, transport_class=Transport,
) )
@@ -71,7 +71,7 @@ def sync_gen() -> Generator[Dict[Any, Any], None, None]:
def scan_types() -> None: def scan_types() -> None:
for _ in scan( for _ in scan(
es, client,
query={"query": {"match_all": {}}}, query={"query": {"match_all": {}}},
request_timeout=10, request_timeout=10,
clear_scroll=True, clear_scroll=True,
@@ -79,7 +79,7 @@ def scan_types() -> None:
): ):
pass pass
for _ in scan( for _ in scan(
es, client,
raise_on_error=False, raise_on_error=False,
preserve_order=False, preserve_order=False,
scroll="10m", scroll="10m",
@@ -90,32 +90,35 @@ def scan_types() -> None:
def streaming_bulk_types() -> None: def streaming_bulk_types() -> None:
for _ in streaming_bulk(es, sync_gen()): for _ in streaming_bulk(client, sync_gen()):
pass pass
for _ in streaming_bulk(es, sync_gen().__iter__()): for _ in streaming_bulk(client, sync_gen().__iter__()):
pass pass
for _ in streaming_bulk(es, [{}]): for _ in streaming_bulk(client, [{}]):
pass pass
for _ in streaming_bulk(es, ({},)): for _ in streaming_bulk(client, ({},)):
pass pass
def bulk_types() -> None: def bulk_types() -> None:
_, _ = bulk(es, sync_gen()) _, _ = bulk(client, sync_gen())
_, _ = bulk(es, sync_gen().__iter__()) _, _ = bulk(client, sync_gen().__iter__())
_, _ = bulk(es, [{}]) _, _ = bulk(client, [{}])
_, _ = bulk(es, ({},)) _, _ = bulk(client, ({},))
def reindex_types() -> None: def reindex_types() -> None:
_, _ = reindex( _, _ = reindex(
es, "src-index", "target-index", query={"query": {"match": {"key": "val"}}} client, "src-index", "target-index", query={"query": {"match": {"key": "val"}}}
) )
_, _ = reindex( _, _ = reindex(
es, source_index="src-index", target_index="target-index", target_client=es client,
source_index="src-index",
target_index="target-index",
target_client=client,
) )
_, _ = reindex( _, _ = reindex(
es, client,
"src-index", "src-index",
"target-index", "target-index",
chunk_size=1, chunk_size=1,
@@ -125,7 +128,7 @@ def reindex_types() -> None:
) )
es2 = AsyncOpenSearch( client2 = AsyncOpenSearch(
[{"host": "localhost", "port": 9443}], [{"host": "localhost", "port": 9443}],
transport_class=AsyncTransport, transport_class=AsyncTransport,
) )
@@ -150,7 +153,7 @@ async def async_gen() -> AsyncGenerator[Dict[Any, Any], None]:
async def async_scan_types() -> None: async def async_scan_types() -> None:
async for _ in async_scan( async for _ in async_scan(
es2, client2,
query={"query": {"match_all": {}}}, query={"query": {"match_all": {}}},
request_timeout=10, request_timeout=10,
clear_scroll=True, clear_scroll=True,
@@ -158,7 +161,7 @@ async def async_scan_types() -> None:
): ):
pass pass
async for _ in async_scan( async for _ in async_scan(
es2, client2,
raise_on_error=False, raise_on_error=False,
preserve_order=False, preserve_order=False,
scroll="10m", scroll="10m",
@@ -169,32 +172,35 @@ async def async_scan_types() -> None:
async def async_streaming_bulk_types() -> None: async def async_streaming_bulk_types() -> None:
async for _ in async_streaming_bulk(es2, async_gen()): async for _ in async_streaming_bulk(client2, async_gen()):
pass pass
async for _ in async_streaming_bulk(es2, async_gen().__aiter__()): async for _ in async_streaming_bulk(client2, async_gen().__aiter__()):
pass pass
async for _ in async_streaming_bulk(es2, [{}]): async for _ in async_streaming_bulk(client2, [{}]):
pass pass
async for _ in async_streaming_bulk(es2, ({},)): async for _ in async_streaming_bulk(client2, ({},)):
pass pass
async def async_bulk_types() -> None: async def async_bulk_types() -> None:
_, _ = await async_bulk(es2, async_gen()) _, _ = await async_bulk(client2, async_gen())
_, _ = await async_bulk(es2, async_gen().__aiter__()) _, _ = await async_bulk(client2, async_gen().__aiter__())
_, _ = await async_bulk(es2, [{}]) _, _ = await async_bulk(client2, [{}])
_, _ = await async_bulk(es2, ({},)) _, _ = await async_bulk(client2, ({},))
async def async_reindex_types() -> None: async def async_reindex_types() -> None:
_, _ = await async_reindex( _, _ = await async_reindex(
es2, "src-index", "target-index", query={"query": {"match": {"key": "val"}}} client2, "src-index", "target-index", query={"query": {"match": {"key": "val"}}}
) )
_, _ = await async_reindex( _, _ = await async_reindex(
es2, source_index="src-index", target_index="target-index", target_client=es2 client2,
source_index="src-index",
target_index="target-index",
target_client=client2,
) )
_, _ = await async_reindex( _, _ = await async_reindex(
es2, client2,
"src-index", "src-index",
"target-index", "target-index",
chunk_size=1, chunk_size=1,
+17 -14
View File
@@ -39,7 +39,7 @@ from opensearch.helpers import (
async_streaming_bulk, async_streaming_bulk,
) )
es = AsyncOpenSearch( client = AsyncOpenSearch(
[{"host": "localhost", "port": 9443}], [{"host": "localhost", "port": 9443}],
transport_class=AsyncTransport, transport_class=AsyncTransport,
) )
@@ -64,7 +64,7 @@ async def async_gen() -> AsyncGenerator[Dict[Any, Any], None]:
async def async_scan_types() -> None: async def async_scan_types() -> None:
async for _ in async_scan( async for _ in async_scan(
es, client,
query={"query": {"match_all": {}}}, query={"query": {"match_all": {}}},
request_timeout=10, request_timeout=10,
clear_scroll=True, clear_scroll=True,
@@ -72,7 +72,7 @@ async def async_scan_types() -> None:
): ):
pass pass
async for _ in async_scan( async for _ in async_scan(
es, client,
raise_on_error=False, raise_on_error=False,
preserve_order=False, preserve_order=False,
scroll="10m", scroll="10m",
@@ -83,32 +83,35 @@ async def async_scan_types() -> None:
async def async_streaming_bulk_types() -> None: async def async_streaming_bulk_types() -> None:
async for _ in async_streaming_bulk(es, async_gen()): async for _ in async_streaming_bulk(client, async_gen()):
pass pass
async for _ in async_streaming_bulk(es, async_gen().__aiter__()): async for _ in async_streaming_bulk(client, async_gen().__aiter__()):
pass pass
async for _ in async_streaming_bulk(es, [{}]): async for _ in async_streaming_bulk(client, [{}]):
pass pass
async for _ in async_streaming_bulk(es, ({},)): async for _ in async_streaming_bulk(client, ({},)):
pass pass
async def async_bulk_types() -> None: async def async_bulk_types() -> None:
_, _ = await async_bulk(es, async_gen()) _, _ = await async_bulk(client, async_gen())
_, _ = await async_bulk(es, async_gen().__aiter__()) _, _ = await async_bulk(client, async_gen().__aiter__())
_, _ = await async_bulk(es, [{}]) _, _ = await async_bulk(client, [{}])
_, _ = await async_bulk(es, ({},)) _, _ = await async_bulk(client, ({},))
async def async_reindex_types() -> None: async def async_reindex_types() -> None:
_, _ = await async_reindex( _, _ = await async_reindex(
es, "src-index", "target-index", query={"query": {"match": {"key": "val"}}} client, "src-index", "target-index", query={"query": {"match": {"key": "val"}}}
) )
_, _ = await async_reindex( _, _ = await async_reindex(
es, source_index="src-index", target_index="target-index", target_client=es client,
source_index="src-index",
target_index="target-index",
target_client=client,
) )
_, _ = await async_reindex( _, _ = await async_reindex(
es, client,
"src-index", "src-index",
"target-index", "target-index",
chunk_size=1, chunk_size=1,
+17 -14
View File
@@ -29,7 +29,7 @@ from typing import Any, Dict, Generator
from opensearch import ConnectionPool, OpenSearch, RequestsHttpConnection, Transport from opensearch import ConnectionPool, OpenSearch, RequestsHttpConnection, Transport
from opensearch.helpers import bulk, reindex, scan, streaming_bulk from opensearch.helpers import bulk, reindex, scan, streaming_bulk
es = OpenSearch( client = OpenSearch(
[{"host": "localhost", "port": 9443}], [{"host": "localhost", "port": 9443}],
transport_class=Transport, transport_class=Transport,
) )
@@ -54,7 +54,7 @@ def sync_gen() -> Generator[Dict[Any, Any], None, None]:
def scan_types() -> None: def scan_types() -> None:
for _ in scan( for _ in scan(
es, client,
query={"query": {"match_all": {}}}, query={"query": {"match_all": {}}},
request_timeout=10, request_timeout=10,
clear_scroll=True, clear_scroll=True,
@@ -62,7 +62,7 @@ def scan_types() -> None:
): ):
pass pass
for _ in scan( for _ in scan(
es, client,
raise_on_error=False, raise_on_error=False,
preserve_order=False, preserve_order=False,
scroll="10m", scroll="10m",
@@ -73,32 +73,35 @@ def scan_types() -> None:
def streaming_bulk_types() -> None: def streaming_bulk_types() -> None:
for _ in streaming_bulk(es, sync_gen()): for _ in streaming_bulk(client, sync_gen()):
pass pass
for _ in streaming_bulk(es, sync_gen().__iter__()): for _ in streaming_bulk(client, sync_gen().__iter__()):
pass pass
for _ in streaming_bulk(es, [{}]): for _ in streaming_bulk(client, [{}]):
pass pass
for _ in streaming_bulk(es, ({},)): for _ in streaming_bulk(client, ({},)):
pass pass
def bulk_types() -> None: def bulk_types() -> None:
_, _ = bulk(es, sync_gen()) _, _ = bulk(client, sync_gen())
_, _ = bulk(es, sync_gen().__iter__()) _, _ = bulk(client, sync_gen().__iter__())
_, _ = bulk(es, [{}]) _, _ = bulk(client, [{}])
_, _ = bulk(es, ({},)) _, _ = bulk(client, ({},))
def reindex_types() -> None: def reindex_types() -> None:
_, _ = reindex( _, _ = reindex(
es, "src-index", "target-index", query={"query": {"match": {"key": "val"}}} client, "src-index", "target-index", query={"query": {"match": {"key": "val"}}}
) )
_, _ = reindex( _, _ = reindex(
es, source_index="src-index", target_index="target-index", target_client=es client,
source_index="src-index",
target_index="target-index",
target_client=client,
) )
_, _ = reindex( _, _ = reindex(
es, client,
"src-index", "src-index",
"target-index", "target-index",
chunk_size=1, chunk_size=1,