Elasticsearch¶
ElasticResources Instance¶
The ElasticResources instance provides access to the Elasticsearch clients.
This instance is available under the SuperdeskAsyncApp.elastic.
There are two types of clients you can use: one for standard synchronous connections and another for asynchronous
connections. The functions ending in _async provide the asynchronous version.
For example:
from superdesk.core.app import get_current_async_app
def test():
client = get_current_async_app().elastic.get_client("users")
users = client.search(
{"query": {"term": {"first_name": "monkey"}}}
)
async def test_async():
client = get_current_async_app().elastic.get_client_async("users")
users = await client.search(
{"query": {"term": {"first_name": "monkey"}}}
)
The get_client method returns an instance of
ElasticResourceClient.
The get_client_async method returns an instance of
ElasticResourceAsyncClient.
Elastic References¶
- class ElasticResourceConfig(prefix: str = 'ELASTICSEARCH', default_sort: str | list[tuple[str, Literal[1, -1]]] | None = None, default_max_results: int = 25, filter: dict[str, Any] | None = None, filter_callback: Callable[[SearchRequest | None], dict[str, Any]] | None = None, aggregations: dict[str, Any] | None = None, highlight: Callable[[str], dict[str, Any] | None] | None = None, facets: dict[str, Any] | None = None, auto_create_index: bool = True)[source]¶
Resource config for use with Elasticsearch, to be included with the ResourceConfig
- prefix: str = 'ELASTICSEARCH'¶
Config prefix to be used
- default_sort: str | list[tuple[str, Literal[1, -1]]] | None = None¶
The default sort
- default_max_results: int = 25¶
The default maximum number of documents to be returned
- filter: dict[str, Any] | None = None¶
An optional filter to be applied to all searches
- filter_callback: Callable[[SearchRequest | None], dict[str, Any]] | None = None¶
An optional callback used to construct a filter dynamically, to be applied to all searches
- aggregations: dict[str, Any] | None = None¶
An optional dictionary of field aggregations
- highlight: Callable[[str], dict[str, Any] | None] | None = None¶
An optional dictionary of highlights to be applied
- facets: dict[str, Any] | None = None¶
An optional list of facets to be applied (Will this be required in new version?)
- auto_create_index: bool = True¶
If
Falsewill skip auto-creating the index for this resource (eve resource will be used instead)
- class ElasticResources(app: SuperdeskAsyncApp)[source]¶
- app: SuperdeskAsyncApp¶
A reference back to the parent app, for configuration purposes
- register_resource_config(resource_name: str, resource_config: ElasticResourceConfig)¶
Register a resource for use with Elasticsearch.
- Parameters:
resource_name – The name of the resource to register.
resource_config – The config of the resource to register.
- get_client(resource_name) ElasticResourceClient¶
Get a synchronous ElasticResourceClient for a registered resource
- Parameters:
resource_name – The name of the resource to register.
- Returns:
A client used for managing resources
- Raises:
KeyError – If the resource is not registered for use with Elasticsearch
- get_client_async(resource_name) ElasticResourceAsyncClient¶
Get an asynchronous ElasticResourceAsyncClient for a registered resource
- Parameters:
resource_name – The name of the resource to register.
- Returns:
A client used for managing resources
- Raises:
KeyError – If the resource is not registered for use with Elasticsearch
- async close_all_clients()¶
Close all connections to Elasticsearch
- async stop()¶
Close all connections to Elasticsearch and clear all registrations
- init_index(resource_name: str, raise_on_mapping_error: bool = False)¶
Init an Elasticsearch index for the provided resource
- Parameters:
resource_name – The name of the registered resource to init
raise_on_mapping_error – If True will raise an exception if the mapping is invalid
- Raises:
KeyError – If the resource is not registered for use with Elasticsearch
- init_all_indexes(raise_on_mapping_error=False) List[str]¶
Init Elasticsearch indexes for all registered resources
- Parameters:
raise_on_mapping_error – If True will raise an exception if the mapping is invalid
- drop_indexes(index_prefix: str | None = None)¶
Drops Elasticsearch indexes for all registered resources. If index_prefix is provided then it will only drop the indexes that contain such prefix.
- get_settings(resource_client: ElasticResourceClient) Dict[str, Any]¶
Get the settings from Elasticsearch for a registered resource
- Parameters:
resource_client – An ElasticResourceClient instance used to get the settings for
- Raises:
KeyError – If the resource is not registered for use with Elasticsearch
- put_settings(resource_client: ElasticResourceClient)¶
Uploads the settings to Elasticsearch for a registered resource
- Parameters:
resource_client – An ElasticResourceClient instance used to upload the settings to
- search(resource_names: List[str], query: Dict[str, Any]) Dict[str, Any]¶
Search Elasticsearch across multiple indexes
- Parameters:
resource_names – A list of names of the registered resources to search in
query – The search query to filter items by
- Returns:
A dictionary containing the results of the search
- Raises:
ValueError – If no registered resources found
ValueError – If the search is destined for different Elasticsearch clusters
KeyError – If any provided resource name is not registered for use with Elasticsearch
- async search_async(resource_names: List[str], query: Dict[str, Any]) Dict[str, Any]¶
Search Elasticsearch across multiple indexes
- Parameters:
resource_names – A list of names of the registered resources to search in
query – The search query to filter items by
- Returns:
A dictionary containing the results of the search
- Raises:
ValueError – If no registered resources found
ValueError – If the search is destined for different Elasticsearch clusters
KeyError – If any provided resource name is not registered for use with Elasticsearch
- find_by_id(item_id: str, resource_names: List[str]) Dict[str, Any] | None¶
Find a document in multiple registered resources and Elasticsearch indexes
- Parameters:
item_id – The id of the item to find
resource_names – A list of names of the registered resources to search in
- Returns:
The document if found, else None
- Raises:
KeyError – If any provided resource name is not registered for use with Elasticsearch
- async find_by_id_async(item_id: str, resource_names: List[str]) Dict[str, Any] | None¶
Find a document in multiple registered resources and Elasticsearch indexes
- Parameters:
item_id – The id of the item to find
resource_names – A list of names of the registered resources to search in
- Returns:
The document if found, else None
- Raises:
KeyError – If any provided resource name is not registered for use with Elasticsearch
- reindex(resource_name: str, *, requests_per_second: int = 1000)¶
Reindex a registered resource in Elasticsearch
- Parameters:
resource_name – The name of the registered resource to reindex
requests_per_second – The number of requests to reindex per second
- Raises:
KeyError – If the resource is not registered for use with Elasticsearch
- class ElasticResourceClient(resource_name: str, config: ElasticClientConfig, resource_config: ElasticResourceConfig)[source]¶
- insert(docs: List[Dict[str, Any]]) List[str]¶
Insert a list of documents into Elasticsearch.
- Parameters:
docs – List of documents to insert.
- Returns:
List of IDs for the inserted documents.
- bulk_insert(docs: List[Dict[str, Any]]) Tuple[int, List[Dict[str, Any]]]¶
Insert a list of documents into Elasticsearch.
- Parameters:
docs – List of documents to insert.
- Returns:
Tuple containing the number of inserted documents and the number of failed inserts
- bulk_update(ids: set[str | ObjectId], updates: dict) tuple[int, list[Dict[str, Any]]]¶
Handles the bulk update operation for the Elasticsearch index, applying updates to a list of documents identified by their IDs.
- Parameters:
ids – A set of IDs for the items to be updated. Each ID can either be a string or an ObjectId.
updates – A dictionary containing the fields and values to update.
- Returns:
A tuple containing the number of updated documents and a list of failed update responses.
- update(item_id: str, updates: Dict[str, Any]) Any¶
Update a document in Elasticsearch
- Parameters:
item_id – ID of the document to update.
updates – Dictionary of updates to be applied.
- Returns:
The response from Elasticsearch.
- replace(item_id: str, updates: Dict[str, Any]) Any¶
Replace an entire document in Elasticsearch
- Parameters:
item_id – ID of the document to update.
updates – Document used to replace the item.
- Returns:
The response from Elasticsearch.
- remove(item_id: str) Any¶
Delete a document from Elasticsearch
- Parameters:
item_id – ID of the document to delete.
- Returns:
The response from Elasticsearch.
- count(query: Dict[str, Any] | None = None) int¶
Get the number of documents in Elasticsearch that match the search query
- Parameters:
query – The search query to filter items by.
- Returns:
The number of documents in Elasticsearch that matches the query.
- is_empty() bool¶
Utility function used to see if an index has no documents
- Returns:
True if the index is empty, False otherwise
- search(query: Dict[str, Any], indexes: List[str] | None = None, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Any¶
Perform a raw search against the Elasticsearch index
- Parameters:
query – The search query to filter items by
indexes – An optional list of indexes to search in
projection – The field projections to be applied
- Returns:
The response from Elasticsearch
- find_by_id(item_id: str, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) dict[str, Any] | None¶
Find a single document in Elasticsearch based on its ID
- Parameters:
item_id – ID of the document to find.
projection – The field projections to be applied
- Returns:
The document found or None if no document was found.
- find_one(req: SearchRequest) dict[str, Any] | None¶
- find_one(req: dict) dict[str, Any] | None
Find a single document in Elasticsearch based on the provided search query
- Parameters:
lookup – kwargs providing the filters used to search for an item
- Returns:
The document found or None if no document was found.
- find_list_of_ids(ids: List[str]) ElasticCursor¶
Find multiple documents in Elasticsearch based on their IDs
- Parameters:
ids – The list of IDs used to search for
- Returns:
An ElasticCursor instance with the search results
- find(req: SearchRequest, sub_resource_lookup: Dict[str, Any] | None = None) Tuple[ElasticCursor, int]¶
Find documents in Elasticsearch
- Parameters:
req – A SearchRequest instance with the search params to be used
sub_resource_lookup – Optional additional lookup filters
- Returns:
A tuple containing an ElasticCursor instance and the number of documents found
- class ElasticResourceAsyncClient(resource_name: str, config: ElasticClientConfig, resource_config: ElasticResourceConfig)[source]¶
- async insert(docs: List[Dict[str, Any]]) List[str]¶
Insert a list of documents into Elasticsearch.
- Parameters:
docs – List of documents to insert.
- Returns:
List of IDs for the inserted documents.
- async bulk_insert(docs: List[Dict[str, Any]]) Tuple[int, List[Dict[str, Any]]]¶
Insert a list of documents into Elasticsearch.
- Parameters:
docs – List of documents to insert.
- Returns:
Tuple containing the number of inserted documents and the number of failed inserts
- async bulk_update(ids: set[str | ObjectId], updates: dict) tuple[int, list[dict]]¶
Handles the bulk update operation for the Elasticsearch index, applying updates to a list of documents identified by their IDs.
- Parameters:
ids – A set of IDs for the items to be updated. Each ID can either be a string or an ObjectId.
updates – A dictionary containing the fields and values to update.
- Returns:
A tuple containing the number of updated documents and a list of failed update responses.
- async update(item_id: str, updates: Dict[str, Any]) Any¶
Update a document in Elasticsearch
- Parameters:
item_id – ID of the document to update.
updates – Dictionary of updates to be applied.
- Returns:
The response from Elasticsearch.
- async replace(item_id: str, updates: Dict[str, Any]) Any¶
Replace an entire document in Elasticsearch
- Parameters:
item_id – ID of the document to update.
updates – Document used to replace the item.
- Returns:
The response from Elasticsearch.
- async remove(item_id: str) Any¶
Delete a document from Elasticsearch
- Parameters:
item_id – ID of the document to delete.
- Returns:
The response from Elasticsearch.
- async count(query: Dict[str, Any] | None = None) int¶
Get the number of documents in Elasticsearch that match the search query
- Parameters:
query – The search query to filter items by.
- Returns:
The number of documents in Elasticsearch that matches the query.
- async is_empty() bool¶
Utility function used to see if an index has no documents
- Returns:
True if the index is empty, False otherwise
- async search(query: Dict[str, Any], indexes: List[str] | None = None, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Any¶
Perform a raw search against the Elasticsearch index
- Parameters:
query – The search query to filter items by.
indexes – An optional list of indexes to search in.
projection – The field projections to be applied
- Returns:
The response from Elasticsearch.
- async find_by_id(item_id: str, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) dict[str, Any] | None¶
Find a single document in Elasticsearch based on its ID
- Parameters:
item_id – ID of the document to find.
projection – The field projections to be applied
- Returns:
The document found or None if no document was found.
- async find_one(req: SearchRequest) dict[str, Any] | None¶
- async find_one(req: dict) dict[str, Any] | None
Find a single document in Elasticsearch based on the provided search query
- Parameters:
lookup – kwargs providing the filters used to search for an item
- Returns:
The document found or None if no document was found.
- async find_list_of_ids(ids: List[str]) ElasticCursor¶
Find multiple documents in Elasticsearch based on their IDs
- Parameters:
ids – The list of IDs used to search for
- Returns:
An ElasticCursor instance with the search results
- async find(req: SearchRequest, sub_resource_lookup: Dict[str, Any] | None = None)¶
Find documents in Elasticsearch
- Parameters:
req – A SearchRequest instance with the search params to be used
sub_resource_lookup – Optional additional lookup filters
- Returns:
A tuple containing an ElasticCursor instance and the number of documents found
- json_schema_to_elastic_mapping(json_schema: Dict[str, Any]) Dict[str, Any][source]¶
Construct an Elasticsearch mapping from an OpenAPI JSON schema.
- Parameters:
json_schema – OpenAPI JSON schema
- Returns:
Elastic mapping for use with an Elasticsearch index
- get_elastic_mapping_from_model(name: str, model_class: type[BaseModel], schema_overrides: Dict[str, Any] | None = None) Dict[str, Any][source]¶
Constructs an Elasticsearch mapping from a ResourceModel
- Parameters:
name – The name of the resource
model_class – The ResourceModel class
schema_overrides – Overrides to be applied to the generated mapping
- Returns:
Elastic mapping for use with an ELasticsearch index