Elasticsearch

ElasticResources Instance

The ElasticResources instance provides access to the Elasticsearch clients. This instance is available under the SuperdeskAsyncApp.elastic.

There are two types of clients you can use: one for standard synchronous connections and another for asynchronous connections. The functions ending in _async provide the asynchronous version.

For example:

from superdesk.core.app import get_current_async_app

def test():
    client = get_current_async_app().elastic.get_client("users")
    users = client.search(
        {"query": {"term": {"first_name": "monkey"}}}
    )

async def test_async():
    client = get_current_async_app().elastic.get_client_async("users")
    users = await client.search(
        {"query": {"term": {"first_name": "monkey"}}}
    )

The get_client method returns an instance of ElasticResourceClient.

The get_client_async method returns an instance of ElasticResourceAsyncClient.

Elastic References

class ElasticResourceConfig(prefix: str = 'ELASTICSEARCH', default_sort: str | list[tuple[str, Literal[1, -1]]] | None = None, default_max_results: int = 25, filter: dict[str, Any] | None = None, filter_callback: Callable[[SearchRequest | None], dict[str, Any]] | None = None, aggregations: dict[str, Any] | None = None, highlight: Callable[[str], dict[str, Any] | None] | None = None, facets: dict[str, Any] | None = None, auto_create_index: bool = True)[source]

Resource config for use with Elasticsearch, to be included with the ResourceConfig

prefix: str = 'ELASTICSEARCH'

Config prefix to be used

default_sort: str | list[tuple[str, Literal[1, -1]]] | None = None

The default sort

default_max_results: int = 25

The default maximum number of documents to be returned

filter: dict[str, Any] | None = None

An optional filter to be applied to all searches

filter_callback: Callable[[SearchRequest | None], dict[str, Any]] | None = None

An optional callback used to construct a filter dynamically, to be applied to all searches

aggregations: dict[str, Any] | None = None

An optional dictionary of field aggregations

highlight: Callable[[str], dict[str, Any] | None] | None = None

An optional dictionary of highlights to be applied

facets: dict[str, Any] | None = None

An optional list of facets to be applied (Will this be required in new version?)

auto_create_index: bool = True

If False will skip auto-creating the index for this resource (eve resource will be used instead)

class ElasticResources(app: SuperdeskAsyncApp)[source]
app: SuperdeskAsyncApp

A reference back to the parent app, for configuration purposes

register_resource_config(resource_name: str, resource_config: ElasticResourceConfig)

Register a resource for use with Elasticsearch.

Parameters:
  • resource_name – The name of the resource to register.

  • resource_config – The config of the resource to register.

get_client(resource_name) ElasticResourceClient

Get a synchronous ElasticResourceClient for a registered resource

Parameters:

resource_name – The name of the resource to register.

Returns:

A client used for managing resources

Raises:

KeyError – If the resource is not registered for use with Elasticsearch

get_client_async(resource_name) ElasticResourceAsyncClient

Get an asynchronous ElasticResourceAsyncClient for a registered resource

Parameters:

resource_name – The name of the resource to register.

Returns:

A client used for managing resources

Raises:

KeyError – If the resource is not registered for use with Elasticsearch

async close_all_clients()

Close all connections to Elasticsearch

async stop()

Close all connections to Elasticsearch and clear all registrations

init_index(resource_name: str, raise_on_mapping_error: bool = False)

Init an Elasticsearch index for the provided resource

Parameters:
  • resource_name – The name of the registered resource to init

  • raise_on_mapping_error – If True will raise an exception if the mapping is invalid

Raises:

KeyError – If the resource is not registered for use with Elasticsearch

init_all_indexes(raise_on_mapping_error=False) List[str]

Init Elasticsearch indexes for all registered resources

Parameters:

raise_on_mapping_error – If True will raise an exception if the mapping is invalid

drop_indexes(index_prefix: str | None = None)

Drops Elasticsearch indexes for all registered resources. If index_prefix is provided then it will only drop the indexes that contain such prefix.

get_settings(resource_client: ElasticResourceClient) Dict[str, Any]

Get the settings from Elasticsearch for a registered resource

Parameters:

resource_client – An ElasticResourceClient instance used to get the settings for

Raises:

KeyError – If the resource is not registered for use with Elasticsearch

put_settings(resource_client: ElasticResourceClient)

Uploads the settings to Elasticsearch for a registered resource

Parameters:

resource_client – An ElasticResourceClient instance used to upload the settings to

search(resource_names: List[str], query: Dict[str, Any]) Dict[str, Any]

Search Elasticsearch across multiple indexes

Parameters:
  • resource_names – A list of names of the registered resources to search in

  • query – The search query to filter items by

Returns:

A dictionary containing the results of the search

Raises:
  • ValueError – If no registered resources found

  • ValueError – If the search is destined for different Elasticsearch clusters

  • KeyError – If any provided resource name is not registered for use with Elasticsearch

async search_async(resource_names: List[str], query: Dict[str, Any]) Dict[str, Any]

Search Elasticsearch across multiple indexes

Parameters:
  • resource_names – A list of names of the registered resources to search in

  • query – The search query to filter items by

Returns:

A dictionary containing the results of the search

Raises:
  • ValueError – If no registered resources found

  • ValueError – If the search is destined for different Elasticsearch clusters

  • KeyError – If any provided resource name is not registered for use with Elasticsearch

find_by_id(item_id: str, resource_names: List[str]) Dict[str, Any] | None

Find a document in multiple registered resources and Elasticsearch indexes

Parameters:
  • item_id – The id of the item to find

  • resource_names – A list of names of the registered resources to search in

Returns:

The document if found, else None

Raises:

KeyError – If any provided resource name is not registered for use with Elasticsearch

async find_by_id_async(item_id: str, resource_names: List[str]) Dict[str, Any] | None

Find a document in multiple registered resources and Elasticsearch indexes

Parameters:
  • item_id – The id of the item to find

  • resource_names – A list of names of the registered resources to search in

Returns:

The document if found, else None

Raises:

KeyError – If any provided resource name is not registered for use with Elasticsearch

reindex(resource_name: str, *, requests_per_second: int = 1000)

Reindex a registered resource in Elasticsearch

Parameters:
  • resource_name – The name of the registered resource to reindex

  • requests_per_second – The number of requests to reindex per second

Raises:

KeyError – If the resource is not registered for use with Elasticsearch

class ElasticResourceClient(resource_name: str, config: ElasticClientConfig, resource_config: ElasticResourceConfig)[source]
insert(docs: List[Dict[str, Any]]) List[str]

Insert a list of documents into Elasticsearch.

Parameters:

docs – List of documents to insert.

Returns:

List of IDs for the inserted documents.

bulk_insert(docs: List[Dict[str, Any]]) Tuple[int, List[Dict[str, Any]]]

Insert a list of documents into Elasticsearch.

Parameters:

docs – List of documents to insert.

Returns:

Tuple containing the number of inserted documents and the number of failed inserts

update(item_id: str, updates: Dict[str, Any]) Any

Update a document in Elasticsearch

Parameters:
  • item_id – ID of the document to update.

  • updates – Dictionary of updates to be applied.

Returns:

The response from Elasticsearch.

replace(item_id: str, updates: Dict[str, Any]) Any

Replace an entire document in Elasticsearch

Parameters:
  • item_id – ID of the document to update.

  • updates – Document used to replace the item.

Returns:

The response from Elasticsearch.

remove(item_id: str) Any

Delete a document from Elasticsearch

Parameters:

item_id – ID of the document to delete.

Returns:

The response from Elasticsearch.

count(query: Dict[str, Any] | None = None) int

Get the number of documents in Elasticsearch that match the search query

Parameters:

query – The search query to filter items by.

Returns:

The number of documents in Elasticsearch that matches the query.

is_empty() bool

Utility function used to see if an index has no documents

Returns:

True if the index is empty, False otherwise

search(query: Dict[str, Any], indexes: List[str] | None = None, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Any

Perform a raw search against the Elasticsearch index

Parameters:
  • query – The search query to filter items by

  • indexes – An optional list of indexes to search in

  • projection – The field projections to be applied

Returns:

The response from Elasticsearch

find_by_id(item_id: str, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) dict[str, Any] | None

Find a single document in Elasticsearch based on its ID

Parameters:
  • item_id – ID of the document to find.

  • projection – The field projections to be applied

Returns:

The document found or None if no document was found.

find_one(req: SearchRequest) dict[str, Any] | None
find_one(req: dict) dict[str, Any] | None

Find a single document in Elasticsearch based on the provided search query

Parameters:

lookup – kwargs providing the filters used to search for an item

Returns:

The document found or None if no document was found.

find_list_of_ids(ids: List[str]) ElasticCursor

Find multiple documents in Elasticsearch based on their IDs

Parameters:

ids – The list of IDs used to search for

Returns:

An ElasticCursor instance with the search results

find(req: SearchRequest, sub_resource_lookup: Dict[str, Any] | None = None) Tuple[ElasticCursor, int]

Find documents in Elasticsearch

Parameters:
  • req – A SearchRequest instance with the search params to be used

  • sub_resource_lookup – Optional additional lookup filters

Returns:

A tuple containing an ElasticCursor instance and the number of documents found

class ElasticResourceAsyncClient(resource_name: str, config: ElasticClientConfig, resource_config: ElasticResourceConfig)[source]
async insert(docs: List[Dict[str, Any]]) List[str]

Insert a list of documents into Elasticsearch.

Parameters:

docs – List of documents to insert.

Returns:

List of IDs for the inserted documents.

async bulk_insert(docs: List[Dict[str, Any]]) Tuple[int, List[Dict[str, Any]]]

Insert a list of documents into Elasticsearch.

Parameters:

docs – List of documents to insert.

Returns:

Tuple containing the number of inserted documents and the number of failed inserts

async update(item_id: str, updates: Dict[str, Any]) Any

Update a document in Elasticsearch

Parameters:
  • item_id – ID of the document to update.

  • updates – Dictionary of updates to be applied.

Returns:

The response from Elasticsearch.

async replace(item_id: str, updates: Dict[str, Any]) Any

Replace an entire document in Elasticsearch

Parameters:
  • item_id – ID of the document to update.

  • updates – Document used to replace the item.

Returns:

The response from Elasticsearch.

async remove(item_id: str) Any

Delete a document from Elasticsearch

Parameters:

item_id – ID of the document to delete.

Returns:

The response from Elasticsearch.

async count(query: Dict[str, Any] | None = None) int

Get the number of documents in Elasticsearch that match the search query

Parameters:

query – The search query to filter items by.

Returns:

The number of documents in Elasticsearch that matches the query.

async is_empty() bool

Utility function used to see if an index has no documents

Returns:

True if the index is empty, False otherwise

async search(query: Dict[str, Any], indexes: List[str] | None = None, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Any

Perform a raw search against the Elasticsearch index

Parameters:
  • query – The search query to filter items by.

  • indexes – An optional list of indexes to search in.

  • projection – The field projections to be applied

Returns:

The response from Elasticsearch.

async find_by_id(item_id: str, projection: ProjectedFieldSources | list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) dict[str, Any] | None

Find a single document in Elasticsearch based on its ID

Parameters:
  • item_id – ID of the document to find.

  • projection – The field projections to be applied

Returns:

The document found or None if no document was found.

async find_one(req: SearchRequest) dict[str, Any] | None
async find_one(req: dict) dict[str, Any] | None

Find a single document in Elasticsearch based on the provided search query

Parameters:

lookup – kwargs providing the filters used to search for an item

Returns:

The document found or None if no document was found.

async find_list_of_ids(ids: List[str]) ElasticCursor

Find multiple documents in Elasticsearch based on their IDs

Parameters:

ids – The list of IDs used to search for

Returns:

An ElasticCursor instance with the search results

async find(req: SearchRequest, sub_resource_lookup: Dict[str, Any] | None = None)

Find documents in Elasticsearch

Parameters:
  • req – A SearchRequest instance with the search params to be used

  • sub_resource_lookup – Optional additional lookup filters

Returns:

A tuple containing an ElasticCursor instance and the number of documents found

json_schema_to_elastic_mapping(json_schema: Dict[str, Any]) Dict[str, Any][source]

Construct an Elasticsearch mapping from an OpenAPI JSON schema.

Parameters:

json_schema – OpenAPI JSON schema

Returns:

Elastic mapping for use with an Elasticsearch index

get_elastic_mapping_from_model(name: str, model_class: type[BaseModel], schema_overrides: Dict[str, Any] | None = None) Dict[str, Any][source]

Constructs an Elasticsearch mapping from a ResourceModel

Parameters:
  • name – The name of the resource

  • model_class – The ResourceModel class

  • schema_overrides – Overrides to be applied to the generated mapping

Returns:

Elastic mapping for use with an ELasticsearch index