Signed-off-by: saimedhi <saimedhi@amazon.com>
14 KiB
- User guide of OpenSearch Python client
User guide of OpenSearch Python client
Setup
To add the client to your project, install it using pip:
pip install opensearch-py
Then import it like any other module:
from opensearchpy import OpenSearch
If you prefer to add the client manually or just want to examine the source code, see opensearch-py on GitHub.
Example
In the example given below, we create a client, an index with non-default settings, insert a document in the index, search for the document, delete the document and finally delete the index.
Creating a client
from opensearchpy import OpenSearch
host = 'localhost'
port = 9200
auth = ('admin', 'admin') # For testing only. Don't store credentials in code.
# Provide a CA bundle if you use intermediate CAs with your root CA.
# If this is not given, the CA bundle is is discovered from the first available
# following options:
# - OpenSSL environment variables SSL_CERT_FILE and SSL_CERT_DIR
# - certifi bundle (https://pypi.org/project/certifi/)
# - default behavior of the connection backend (most likely system certs)
ca_certs_path = '/full/path/to/root-ca.pem'
# Optional client certificates if you don't want to use HTTP basic authentication.
# client_cert_path = '/full/path/to/client.pem'
# client_key_path = '/full/path/to/client-key.pem'
# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_compress = True, # enables gzip compression for request bodies
http_auth = auth,
# client_cert = client_cert_path,
# client_key = client_key_path,
use_ssl = True,
verify_certs = True,
ssl_assert_hostname = False,
ssl_show_warn = False,
ca_certs = ca_certs_path
)
Creating an index
# Create an index with non-default settings.
index_name = 'python-test-index3'
index_body = {
'settings': {
'index': {
'number_of_shards': 4
}
}
}
response = client.indices.create(index_name, body=index_body)
print('\nCreating index:')
print(response)
Adding a document to an index
document = {
'title': 'Moneyball',
'director': 'Bennett Miller',
'year': '2011'
}
id = '1'
response = client.index(
index = index_name,
body = document,
id = id,
refresh = True
)
print('\nAdding document:')
print(response)
Adding documents in bulk
docs = '''{"index": {"_index": "index-2022-06-08", "_id": "1"}}
{"name": "foo"}
{"index": {"_index": "index-2022-06-09", "_id": "2"}}
{"name": "bar"}
{"index": {"_index": "index-2022-06-10", "_id": "3"}}
{"name": "baz"}'''
response = client.bulk(docs)
print('\nAdding bulk documents:')
print(response)
Adding documents in bulk using helper functions
docs = []
def generate_data():
mywords = ['foo', 'bar', 'baz']
for index, word in enumerate(mywords):
docs.append({
"_index": "mywords",
"word": word,
"_id": index
})
return docs
response = helpers.bulk(client, generate_data(), max_retries=3)
print('\nAdding bulk documents using helper:')
print(response)
Searching for a document
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
Deleting a document
response = client.delete(
index = index_name,
id = id
)
print('\nDeleting document:')
print(response)
Deleting an index
response = client.indices.delete(
index = index_name
)
print('\nDeleting index:')
print(response)
Making API calls
Point in time API
# create a point in time on a index
index_name = "test-index"
response = client.create_point_in_time(index=index_name,
keep_alive="1m")
pit_id = response.get("pit_id")
print('\n Point in time ID:')
print(pit_id)
# To list all point in time which are alive in the cluster
response = client.list_all_point_in_time()
print('\n List of all Point in Time:')
print(response)
# To delete point in time
pit_body = {
"pit_id": [pit_id]
}
# To delete all point in time
# client.delete_point_in_time(body=None, all=True)
response = client.delete_point_in_time(body=pit_body)
print('\n The deleted point in time:')
print(response)
Using High-level Python client
High-level python client is now merged into Low-level python client. Thus, opensearch-py supports creating and indexing documents, searching with and without filters, and updating documents using queries.High-level Python client documentation.
All the APIs newly added from opensearch-dsl-py are listed in docs.
In the below example, 'Search' API from High-level Python Client is used.
Searching for documents with filters
from opensearchpy import OpenSearch, Search
# Use the above mentioned examples for creating client.
# Then,create an index
# Add a document to the index.
# Search for the document.
s = Search(using=client, index=index_name) \
.filter("term", category="search") \
.query("match", title="python")
response = s.execute()
print('\nSearch results:')
for hit in response:
print(hit.meta.score, hit.title)
# Delete the document.
# Delete the index.
Using plugins
Plugin client definitions can be found here --
Alerting plugin
Searching for monitors
print('\Searching for monitors:')
query = {
"query": {
"match" : {
"monitor.name": "test-monitor"
}
}
}
response = client.plugins.alerting.search_monitor(query)
print(response)
Getting a monitor
print('\Getting a monitor:')
response = client.plugins.alerting.get_monitor("monitorID")
print(response)
Creating a monitor
print('\Creating a bucket level monitor:')
query = {
"type": "monitor",
"name": "Demo bucket-level monitor",
"monitor_type": "bucket_level_monitor",
"enabled": True,
"schedule": {
"period": {
"interval": 1,
"unit": "MINUTES"
}
},
"inputs": [
{
"search": {
"indices": [
"python-test-index3"
],
"query": {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"order_date": {
"from": "||-1h",
"to": "",
"include_lower": True,
"include_upper": True,
"format": "epoch_millis"
}
}
}
]
}
},
"aggregations": {
"composite_agg": {
"composite": {
"sources": [
{
"user": {
"terms": {
"field": "user"
}
}
}
]
},
"aggregations": {
"avg_products_base_price": {
"avg": {
"field": "products.base_price"
}
}
}
}
}
}
}
}
],
}
response = client.plugins.alerting.create_monitor(query)
print(response)
Creating a destination
print('\Creating an email destination:')
query = {
"type": "email",
"name": "my-email-destination",
"email": {
"email_account_id": "YjY7mXMBx015759_IcfW",
"recipients": [
{
"type": "email_group",
"email_group_id": "YzY-mXMBx015759_dscs"
},
{
"type": "email",
"email": "example@email.com"
}
]
}
}
response = client.plugins.alerting.create_destination(query)
print(response)
Getting alerts
print('\Getting alerts:')
response = client.plugins.alerting.get_alerts()
print(response)
Acknowledge alerts
print('\Acknowledge alerts:')
query = {
"alerts": ["eQURa3gBKo1jAh6qUo49"]
}
response = client.plugins.alerting.acknowledge_alert(query)
print(response)
Using different authentication methods
It is possible to use different methods for the authentication to OpenSearch. The parameters of connection_class and http_auth can be used for this. The following examples show how to authenticate using IAM credentials and using Kerberos.
Using IAM credentials
Refer the AWS documentation regarding usage of IAM credentials to sign requests to OpenSearch APIs - Signing HTTP requests to Amazon OpenSearch Service.
Opensearch-py client library also provides an in-house IAM based authentication feature, AWSV4SignerAuth that will help users to connect to their opensearch clusters by making use of IAM roles.
AWSV4SignerAuth uses RequestHttpConnection as transport class for communication with opensearch clusters. Opensearch-py client library provides pool_maxsize option to modify default connection-pool size.
Pre-requisites to use AWSV4SignerAuth
-
Python version 3.6 or above,
-
Install botocore using pip
pip install botocore
Here is the sample code that uses AWSV4SignerAuth -
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
index_name = 'python-test-index3'
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection,
pool_maxsize = 20
)
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
Using IAM authentication with an async client
Make sure to use the AsyncHttpConnection connection class with the async AWSV4SignerAsyncAuth signer.
from opensearchpy import OpenSearch, AsyncHttpConnection, AWSV4SignerAsyncAuth
import boto3
host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2'
service = 'es' # 'aoss' for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAsyncAuth(credentials, region, service)
index_name = 'python-test-index3'
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = auth,
use_ssl = True,
verify_certs = True,
connection_class = AsyncHttpConnection
)
async def search():
q = 'miller'
query = {
'size': 5,
'query': {
'multi_match': {
'query': q,
'fields': ['title^2', 'director']
}
}
}
response = await client.search(
body = query,
index = index_name
)
print('\nSearch results:')
print(response)
search()
Using Kerberos
There are several python packages that provide Kerberos support over HTTP connections, such as requests-kerberos and requests-gssapi. The following example shows how to setup the authentication. Note that some of the parameters, such as mutual_authentication might depend on the server settings.
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
client = OpenSearch(
['htps://...'],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
http_auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)
)
health = client.cluster.health()