de96d28e45
* Don't create universal wheel for Python 3 only Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> * Update Black target version to match min Python supported Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> * Upgrade files using six to Python 3 syntax Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> * Remove redundant six dependency Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> * Format with Black Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> * Add changelog entry Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> --------- Signed-off-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com>
468 lines
14 KiB
Python
468 lines
14 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
# The OpenSearch Contributors require contributions made to
|
|
# this file be licensed under the Apache-2.0 license or a
|
|
# compatible open source license.
|
|
#
|
|
# Modifications Copyright OpenSearch Contributors. See
|
|
# GitHub history for details.
|
|
#
|
|
# Licensed to Elasticsearch B.V. under one or more contributor
|
|
# license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright
|
|
# ownership. Elasticsearch B.V. licenses this file to you under
|
|
# the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Optional
|
|
|
|
from opensearchpy.helpers.aggs import A
|
|
|
|
from .query import MatchAll, Nested, Range, Terms
|
|
from .response import Response
|
|
from .search import Search
|
|
from .utils import AttrDict
|
|
|
|
__all__ = [
|
|
"FacetedSearch",
|
|
"HistogramFacet",
|
|
"TermsFacet",
|
|
"DateHistogramFacet",
|
|
"RangeFacet",
|
|
"NestedFacet",
|
|
]
|
|
|
|
|
|
class Facet:
|
|
"""
|
|
A facet on faceted search. Wraps and aggregation and provides functionality
|
|
to create a filter for selected values and return a list of facet values
|
|
from the result of the aggregation.
|
|
"""
|
|
|
|
agg_type: Optional[str] = None
|
|
|
|
def __init__(
|
|
self, metric: Any = None, metric_sort: str = "desc", **kwargs: Any
|
|
) -> None:
|
|
self.filter_values = ()
|
|
self._params = kwargs
|
|
self._metric = metric
|
|
if metric and metric_sort:
|
|
self._params["order"] = {"metric": metric_sort}
|
|
|
|
def get_aggregation(self) -> Any:
|
|
"""
|
|
Return the aggregation object.
|
|
"""
|
|
agg = A(self.agg_type, **self._params)
|
|
if self._metric:
|
|
agg.metric("metric", self._metric)
|
|
return agg
|
|
|
|
def add_filter(self, filter_values: Any) -> Any:
|
|
"""
|
|
Construct a filter.
|
|
"""
|
|
if not filter_values:
|
|
return
|
|
|
|
f = self.get_value_filter(filter_values[0])
|
|
for v in filter_values[1:]:
|
|
f |= self.get_value_filter(v)
|
|
return f
|
|
|
|
def get_value_filter(self, filter_value: Any) -> Any:
|
|
return None
|
|
|
|
def is_filtered(self, key: Any, filter_values: Any) -> bool:
|
|
"""
|
|
Is a filter active on the given key.
|
|
"""
|
|
return key in filter_values
|
|
|
|
def get_value(self, bucket: Any) -> Any:
|
|
"""
|
|
return a value representing a bucket. Its key as default.
|
|
"""
|
|
return bucket["key"]
|
|
|
|
def get_metric(self, bucket: Any) -> Any:
|
|
"""
|
|
Return a metric, by default doc_count for a bucket.
|
|
"""
|
|
if self._metric:
|
|
return bucket["metric"]["value"]
|
|
return bucket["doc_count"]
|
|
|
|
def get_values(self, data: Any, filter_values: Any) -> Any:
|
|
"""
|
|
Turn the raw bucket data into a list of tuples containing the key,
|
|
number of documents and a flag indicating whether this value has been
|
|
selected or not.
|
|
"""
|
|
out = []
|
|
for bucket in data.buckets:
|
|
key = self.get_value(bucket)
|
|
out.append(
|
|
(key, self.get_metric(bucket), self.is_filtered(key, filter_values))
|
|
)
|
|
return out
|
|
|
|
|
|
class TermsFacet(Facet):
|
|
agg_type: Optional[str] = "terms"
|
|
|
|
def add_filter(self, filter_values: Any) -> Any:
|
|
"""Create a terms filter instead of bool containing term filters."""
|
|
if filter_values:
|
|
return Terms(
|
|
_expand__to_dot=False, **{self._params["field"]: filter_values}
|
|
)
|
|
|
|
|
|
class RangeFacet(Facet):
|
|
agg_type = "range"
|
|
|
|
def _range_to_dict(self, range: Any) -> Any:
|
|
key, range = range
|
|
out = {"key": key}
|
|
if range[0] is not None:
|
|
out["from"] = range[0]
|
|
if range[1] is not None:
|
|
out["to"] = range[1]
|
|
return out
|
|
|
|
def __init__(self, ranges: Any, **kwargs: Any) -> None:
|
|
super().__init__(**kwargs)
|
|
self._params["ranges"] = list(map(self._range_to_dict, ranges))
|
|
self._params["keyed"] = False
|
|
self._ranges = dict(ranges)
|
|
|
|
def get_value_filter(self, filter_value: Any) -> Any:
|
|
f, t = self._ranges[filter_value]
|
|
limits = {}
|
|
if f is not None:
|
|
limits["gte"] = f
|
|
if t is not None:
|
|
limits["lt"] = t
|
|
|
|
return Range(_expand__to_dot=False, **{self._params["field"]: limits})
|
|
|
|
|
|
class HistogramFacet(Facet):
|
|
agg_type = "histogram"
|
|
|
|
def get_value_filter(self, filter_value: Any) -> Any:
|
|
return Range(
|
|
_expand__to_dot=False,
|
|
**{
|
|
self._params["field"]: {
|
|
"gte": filter_value,
|
|
"lt": filter_value + self._params["interval"],
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
def _date_interval_year(d: Any) -> Any:
|
|
return d.replace(
|
|
year=d.year + 1, day=(28 if d.month == 2 and d.day == 29 else d.day)
|
|
)
|
|
|
|
|
|
def _date_interval_month(d: Any) -> Any:
|
|
return (d + timedelta(days=32)).replace(day=1)
|
|
|
|
|
|
def _date_interval_week(d: Any) -> Any:
|
|
return d + timedelta(days=7)
|
|
|
|
|
|
def _date_interval_day(d: Any) -> Any:
|
|
return d + timedelta(days=1)
|
|
|
|
|
|
def _date_interval_hour(d: Any) -> Any:
|
|
return d + timedelta(hours=1)
|
|
|
|
|
|
class DateHistogramFacet(Facet):
|
|
agg_type = "date_histogram"
|
|
|
|
DATE_INTERVALS = {
|
|
"year": _date_interval_year,
|
|
"1Y": _date_interval_year,
|
|
"month": _date_interval_month,
|
|
"1M": _date_interval_month,
|
|
"week": _date_interval_week,
|
|
"1w": _date_interval_week,
|
|
"day": _date_interval_day,
|
|
"1d": _date_interval_day,
|
|
"hour": _date_interval_hour,
|
|
"1h": _date_interval_hour,
|
|
}
|
|
|
|
def __init__(self, **kwargs: Any) -> None:
|
|
kwargs.setdefault("min_doc_count", 0)
|
|
super().__init__(**kwargs)
|
|
|
|
def get_value(self, bucket: Any) -> Any:
|
|
if not isinstance(bucket["key"], datetime):
|
|
# OpenSearch returns key=None instead of 0 for date 1970-01-01,
|
|
# so we need to set key to 0 to avoid TypeError exception
|
|
if bucket["key"] is None:
|
|
bucket["key"] = 0
|
|
# Preserve milliseconds in the datetime
|
|
return datetime.utcfromtimestamp(int(bucket["key"]) / 1000.0) # type: ignore
|
|
else:
|
|
return bucket["key"]
|
|
|
|
def get_value_filter(self, filter_value: Any) -> Any:
|
|
for interval_type in ("calendar_interval", "fixed_interval"):
|
|
if interval_type in self._params:
|
|
break
|
|
else:
|
|
interval_type = "interval"
|
|
|
|
return Range(
|
|
_expand__to_dot=False,
|
|
**{
|
|
self._params["field"]: {
|
|
"gte": filter_value,
|
|
"lt": self.DATE_INTERVALS[self._params[interval_type]](
|
|
filter_value
|
|
),
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
class NestedFacet(Facet):
|
|
agg_type = "nested"
|
|
|
|
def __init__(self, path: Any, nested_facet: Any) -> None:
|
|
self._path = path
|
|
self._inner = nested_facet
|
|
super().__init__(path=path, aggs={"inner": nested_facet.get_aggregation()})
|
|
|
|
def get_values(self, data: Any, filter_values: Any) -> Any:
|
|
return self._inner.get_values(data.inner, filter_values)
|
|
|
|
def add_filter(self, filter_values: Any) -> Any:
|
|
inner_q = self._inner.add_filter(filter_values)
|
|
if inner_q:
|
|
return Nested(path=self._path, query=inner_q)
|
|
|
|
|
|
class FacetedResponse(Response):
|
|
@property
|
|
def query_string(self) -> Any:
|
|
return self._faceted_search._query
|
|
|
|
@property
|
|
def facets(self) -> Any:
|
|
if not hasattr(self, "_facets"):
|
|
super(AttrDict, self).__setattr__("_facets", AttrDict({}))
|
|
for name, facet in self._faceted_search.facets.items():
|
|
self._facets[name] = facet.get_values(
|
|
getattr(getattr(self.aggregations, "_filter_" + name), name),
|
|
self._faceted_search.filter_values.get(name, ()),
|
|
)
|
|
return self._facets
|
|
|
|
|
|
class FacetedSearch:
|
|
"""
|
|
Abstraction for creating faceted navigation searches that takes care of
|
|
composing the queries, aggregations and filters as needed as well as
|
|
presenting the results in an easy-to-consume fashion::
|
|
|
|
class BlogSearch(FacetedSearch):
|
|
index = 'blogs'
|
|
doc_types = [Blog, Post]
|
|
fields = ['title^5', 'category', 'description', 'body']
|
|
|
|
facets = {
|
|
'type': TermsFacet(field='_type'),
|
|
'category': TermsFacet(field='category'),
|
|
'weekly_posts': DateHistogramFacet(field='published_from', interval='week')
|
|
}
|
|
|
|
def search(self):
|
|
' Override search to add your own filters '
|
|
s = super(BlogSearch, self).search()
|
|
return s.filter('term', published=True)
|
|
|
|
# when using:
|
|
blog_search = BlogSearch("web framework", filters={"category": "python"})
|
|
|
|
# supports pagination
|
|
blog_search[10:20]
|
|
|
|
response = blog_search.execute()
|
|
|
|
# easy access to aggregation results:
|
|
for category, hit_count, is_selected in response.facets.category:
|
|
print(
|
|
"Category %s has %d hits%s." % (
|
|
category,
|
|
hit_count,
|
|
' and is chosen' if is_selected else ''
|
|
)
|
|
)
|
|
|
|
"""
|
|
|
|
index: Any = None
|
|
doc_types: Any = None
|
|
fields: Any = None
|
|
facets: Any = {}
|
|
using = "default"
|
|
|
|
def __init__(self, query: Any = None, filters: Any = {}, sort: Any = ()) -> None:
|
|
"""
|
|
:arg query: the text to search for
|
|
:arg filters: facet values to filter
|
|
:arg sort: sort information to be passed to :class:`~opensearchpy.Search`
|
|
"""
|
|
self._query = query
|
|
self._filters: Any = {}
|
|
self._sort = sort
|
|
self.filter_values: Any = {}
|
|
for name, value in filters.items():
|
|
self.add_filter(name, value)
|
|
|
|
self._s = self.build_search()
|
|
|
|
def count(self) -> Any:
|
|
return self._s.count()
|
|
|
|
def __getitem__(self, k: Any) -> Any:
|
|
self._s = self._s[k]
|
|
return self
|
|
|
|
def __iter__(self) -> Any:
|
|
return iter(self._s)
|
|
|
|
def add_filter(self, name: Any, filter_values: Any) -> Any:
|
|
"""
|
|
Add a filter for a facet.
|
|
"""
|
|
# normalize the value into a list
|
|
if not isinstance(filter_values, (tuple, list)):
|
|
if filter_values is None:
|
|
return
|
|
filter_values = [
|
|
filter_values,
|
|
]
|
|
|
|
# remember the filter values for use in FacetedResponse
|
|
self.filter_values[name] = filter_values
|
|
|
|
# get the filter from the facet
|
|
f = self.facets[name].add_filter(filter_values)
|
|
if f is None:
|
|
return
|
|
|
|
self._filters[name] = f
|
|
|
|
def search(self) -> Any:
|
|
"""
|
|
Returns the base Search object to which the facets are added.
|
|
|
|
You can customize the query by overriding this method and returning a
|
|
modified search object.
|
|
"""
|
|
s = Search(doc_type=self.doc_types, index=self.index, using=self.using)
|
|
return s.response_class(FacetedResponse)
|
|
|
|
def query(self, search: Any, query: Any) -> Any:
|
|
"""
|
|
Add query part to ``search``.
|
|
|
|
Override this if you wish to customize the query used.
|
|
"""
|
|
if query:
|
|
if self.fields:
|
|
return search.query("multi_match", fields=self.fields, query=query)
|
|
else:
|
|
return search.query("multi_match", query=query)
|
|
return search
|
|
|
|
def aggregate(self, search: Any) -> Any:
|
|
"""
|
|
Add aggregations representing the facets selected, including potential
|
|
filters.
|
|
"""
|
|
for f, facet in self.facets.items():
|
|
agg = facet.get_aggregation()
|
|
agg_filter = MatchAll()
|
|
for field, filter in self._filters.items():
|
|
if f == field:
|
|
continue
|
|
agg_filter &= filter
|
|
search.aggs.bucket("_filter_" + f, "filter", filter=agg_filter).bucket(
|
|
f, agg
|
|
)
|
|
|
|
def filter(self, search: Any) -> Any:
|
|
"""
|
|
Add a ``post_filter`` to the search request narrowing the results based
|
|
on the facet filters.
|
|
"""
|
|
if not self._filters:
|
|
return search
|
|
|
|
post_filter = MatchAll()
|
|
for f in self._filters.values():
|
|
post_filter &= f
|
|
return search.post_filter(post_filter)
|
|
|
|
def highlight(self, search: Any) -> Any:
|
|
"""
|
|
Add highlighting for all the fields
|
|
"""
|
|
return search.highlight(
|
|
*(f if "^" not in f else f.split("^", 1)[0] for f in self.fields)
|
|
)
|
|
|
|
def sort(self, search: Any) -> Any:
|
|
"""
|
|
Add sorting information to the request.
|
|
"""
|
|
if self._sort:
|
|
search = search.sort(*self._sort)
|
|
return search
|
|
|
|
def build_search(self) -> Any:
|
|
"""
|
|
Construct the ``Search`` object.
|
|
"""
|
|
s = self.search()
|
|
s = self.query(s, self._query)
|
|
s = self.filter(s)
|
|
if self.fields:
|
|
s = self.highlight(s)
|
|
s = self.sort(s)
|
|
self.aggregate(s)
|
|
return s
|
|
|
|
def execute(self) -> Any:
|
|
"""
|
|
Execute the search and return the response.
|
|
"""
|
|
r = self._s.execute()
|
|
r._faceted_search = self
|
|
return r
|