Resource Management

Resource Services

The management of resources is performed using the AsyncResourceService class instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements.

One major difference is the need to directly import the resource service from the module, and not use get_resource_service. This gives us the full typed interface into the specifics of that resource.

For example:

from my_module.users import UsersService, User

async def test_users_service():
    service = UsersService()  # automatic singleton instance

    # Creat the user in both MongoDB and Elasticsearch
    await service.create([
        User(
            id="user_1",
            first_name="Monkey",
            last_name="Magic"
        )
    ])

    # Retrieve the user from the service
    my_user = await service.find_one(_id="user_1")
    assert my_user.first_name == "Monkey"

    # Update the user
    await service.update("user_1", {"last_name": "Mania"}

    # Find users
    cursor = service.search({
        "query": {
            "match": {
                "first_name": "Monkey"
            }
        }
    })
    assert (await cursor.count()) == 1
    async for user in cursor:
        print(user)

    # Iterate over all users, in batches
    async for user in service.get_all_batch():
        print(user)

    # Delete the user from both MongoDB and Elasticsearch
    await service.delete({"_id": "user_1"})

Search References

class AsyncResourceService[source]
property mongo

Return instance of MongoCollection for this resource

property mongo_async

Return instance of async MongoCollection for this resource

property elastic

Returns instance of ElasticResourceAsyncClient for this resource

Raises:

KeyError – If this resource is not configured for Elasticsearch

get_model_instance_from_dict(data: Dict[str, Any]) ResourceModelType

Converts a dictionary to an instance of ResourceModel for this resource

Parameters:

data – Dictionary to convert

Returns:

Instance of ResourceModel for this resource

async find_by_id(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceModelType | None

Find a resource by ID

Parameters:
  • item_id – ID of item to find

  • version – Optional version to get

  • projection – The field projections to be applied

Returns:

None if resource not found, otherwise an instance of ResourceModel for this resource

async find_by_id_raw(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Dict[str, Any] | None

Find a resource by ID

Parameters:
  • item_id – ID of item to find

  • version – Optional version to get

  • projection – The field projections to be applied

Returns:

None if resource not found, otherwise a dictionary of the item

async find_by_ids(ids: Sequence[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[ResourceModelType]

Find resources by IDs

Parameters:
  • ids – IDs of items to find

  • projection – The field projections to be applied

Returns:

List of ResourceModel instances found

async find_by_ids_raw(ids: list[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[dict[str, Any]]

Find resources by IDs

Parameters:
  • ids – IDs of items to find

  • projection – The field projections to be applied

Returns:

List of dictionaries for the resources found

async search(lookup: Dict[str, Any], use_mongo=False, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceCursorAsync[ResourceModelType]

Search the resource using the provided lookup

Will use Elasticsearch if configured for this resource and use_mongo == False.

Parameters:
  • lookup – Dictionary to search

  • use_mongo – Force to use MongoDB instead of Elasticsearch

  • projection – Optional projection to apply to the search results

Returns:

A ResourceCursorAsync instance with the response

async on_create(docs: List[ResourceModelType]) None

Hook to run before creating new resource(s)

Parameters:

docs – List of resources to create

async validate_create(doc: ResourceModelType)

Validate the provided doc for creation

Runs the async validators

Parameters:

doc – Model instance to validate

Raises:

ValueError – If the item is not valid

async validate_update(updates: Dict[str, Any], original: ResourceModelType, etag: str | None) Dict[str, Any]

Validate the provided updates dict against the original model instance

Applies the updates to a copy of the original provided, and runs sync and async validators

Parameters:
  • updates – Dictionary of updates to be applied

  • original – Model instance of the original item to be updated

  • etag – Optional etag, if provided will check etag against original item

Raises:

ValueError – If the item is not valid

async create(docs: Sequence[ResourceModelType | dict]) List[ResourceModelType]

Creates a new resource

Will automatically create the resource(s) in both Elasticsearch (if configured for this resource) and MongoDB.

Parameters:

docs – List of resources or dictionaries to create the registries

Returns:

List of IDs for the created resources

Raises:

Pydantic.ValidationError – If any of the docs provided are not valid

async on_created(docs: List[ResourceModelType]) None

Hook to run after creating new resource(s)

Parameters:

docs – List of resources that were created

async on_update(updates: Dict[str, Any], original: ResourceModelType) None

Hook to run before updating a resource

Parameters:
  • item_id – ID of item to update

  • updates – Dictionary to update

  • original – Instance of ResourceModel for the original resource

async update(item_id: str | ObjectId, updates: Dict[str, Any], etag: str | None = None, original: ResourceModelType | None = None) ResourceModelType

Updates an existing resource

Will automatically update the resource in both Elasticsearch (if configured for this resource) and MongoDB.

Parameters:
  • item_id – ID of item to update

  • updates – Dictionary to update

  • etag – Optional etag, if provided will check etag against original item

  • original – Optional original item, if not provided the service will retrieve it

Raises:

SuperdeskApiError.notFoundError – If original item not found

async on_updated(updates: Dict[str, Any], original: ResourceModelType) None

Hook to run after a resource has been updated

Parameters:
  • updates – Dictionary to update

  • original – Instance of ResourceModel for the original resource

async on_delete(doc: ResourceModelType)

Hook to run before deleting a resource

Parameters:

doc – Instance of ResourceModel for the resource to delete

async delete(doc: ResourceModelType, etag: str | None = None)

Deletes a resource

Parameters:
  • doc – Instance of ResourceModel for the resource to delete

  • etag – Optional etag, if provided will check etag against original item

async delete_many(lookup: Dict[str, Any]) List[str]

Deletes resource(s) using a lookup

Parameters:

lookup – Dictionary for the lookup to find items to delete

Returns:

List of IDs for the deleted resources

async on_deleted(doc: ResourceModelType)

Hook to run after deleting a resource

Parameters:

doc – Instance of ResourceModel for the resource that was deleted

async get_all(lookup: dict | None = None) AsyncIterable[ResourceModelType]

Helper function to get all items from this resource

Returns:

An async iterable with ResourceModel instances

get_all_raw(lookup: dict | None = None) AsyncIOMotorCursor

Helper function to get all items for this resource, as a MongoDB cursor

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A MongoDB cursor with the results, sorted by ID

async get_all_list(lookup: dict | None = None) list[ResourceModelType]

Helper function to get all items from this resource, as a list

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A list of ResourceModel instances

async get_all_list_raw(lookup: dict | None = None) list[dict]

Helper function to get all items from this resource, as a list

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A list of dictionaries

async get_all_map(lookup: dict | None = None) dict[str | ObjectId, ResourceModelType]

Helper function to get all items from this resource, as a dictionary where key is the item ID.

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A dictionary of ResourceModel instances, where the key is the ID of an item

async get_all_map_raw(lookup: dict | None = None) dict[str | ObjectId, dict]

Helper function to get all items from this resource, as a dictionary where key is the item ID.

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A dictionary of resource dictionary, where the key is the ID of an item

async get_all_batch(size=500, max_iterations=10000, lookup=None) AsyncIterable[ResourceModelType]

Helper function to get all items from this resource, in batches

Parameters:
  • size – Number of items to fetch on each iteration

  • max_iterations – Maximum number of iterations to run, before returning gracefully

  • lookup – Optional dictionary used to filter items for

Returns:

An async iterable with ResourceModel instances

async get_all_batch_elastic_raw(query: dict | None = None, size: int = 500, max_iterations: int = 10000, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) AsyncGenerator[dict, None]

Helper function to get all items from this resource, in batches

The default arguments allow iterating over 5 million documents. If more is needed, then consider increasing size and/or max_iterations

Parameters:
  • query – An optional elasticsearch query used to filter items for

  • size – The number of items to fetch on each iteration

  • max_iterations – Maximum number of iterations to run, before returning gracefully

  • projection – Optional projection to apply to the search results

Returns:

An async generator yielding a dictionary of the document

async get_all_batch_elastic(query: dict | None = None, size: int = 500, max_iterations: int = 10000, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) AsyncGenerator[ResourceModelType, None]

Helper function to get all items from this resource, in batches

The default arguments allow iterating over 5 million documents. If more is needed, then consider increasing size and/or max_iterations

Parameters:
  • query – An optional elasticsearch query used to filter items for

  • size – The number of items to fetch on each iteration

  • max_iterations – Maximum number of iterations to run, before returning gracefully

  • projection – Optional projection to apply to the search results

Returns:

An async generator with ResourceModel instances

async find(req: SearchRequest) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]
async find(req: dict, page: int = 1, max_results: int = 25, sort: str | list[tuple[str, Literal[1, -1]]] | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None, use_mongo: bool = False) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]

Find items from the resource using Elasticsearch

Parameters:
  • req – SearchRequest instance, or a lookup dictionary, for the search params to be used

  • page – The page number to retrieve (defaults to 1)

  • max_results – The maximum number of results to retrieve per page (defaults to 25)

  • sort – The sort order to use (defaults to resource default sort, or not sorting applied)

  • projection – The field projections to be applied

  • use_mongo – If True will force use mongo, else will attempt elastic first

Returns:

An async iterable with ResourceModel instances

Raises:

SuperdeskApiError.notFoundError – If Elasticsearch is not configured

async count(lookup: dict[str, Any] | None = None, use_mongo: bool = False) int

Get the number of items that match the lookup, or all items if lookup is not provided

Will use Elasticsearch if configured for this resource and use_mongo == False. This will not perform a search, but use the item count feature of the underlying data store

Parameters:
  • lookup – Dictionary to search

  • use_mongo – Force to use MongoDB instead of Elasticsearch

Returns:

The number of items that match the lookup

validate_etag(original: ResourceModelType, etag: str | None) None

Validate the provided etag against the original

If the provided etag argument is None, then validation will not occur

Parameters:
  • original – Instance of ResourceModel for the resource to validate etag against

  • etag – Etag string to validate

Raises:

SuperdeskApiError.preconditionFailedError – If the provided etag is invalid

async get_all_item_versions(item_id: str, max_results: int = 200, page: int = 1, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) tuple[list[dict], int]

Get all versions of an item, with pagination and projection supported.

Parameters:
  • item_id – The ID of the item to get all versions for

  • max_results – The maximum number of results to retrieve per page (defaults to 200)

  • projection – The field projections to be applied

Returns:

A tuple - list of dictionary items and the count of total items

Raises:

SuperdeskApiError.badRequestError – If versioning is not enabled on the resource

async get_item_version(item: dict, version: int) dict

Get a specific version of an item

Parameters:
  • item – The item dictionary to get a version for

  • version – The version to get

Returns:

A dictionary for the specific version of the item

Raises:

SuperdeskApiError.badRequestError – If versioning is not enabled on the resource

async system_update(item_id: ObjectId | str, updates: dict[str, Any], update_etag: bool = False) None

Update an item with the supplied updates, and do not update the etag or call any signals

Parameters:
  • item_id – The ID of the item to update

  • updates – A dictionary of values to update

  • update_etag – If True will update the item’s _etag

async bulk_update(ids: set[str | ObjectId], updates: dict) tuple[UpdateResult, tuple[int, list[dict]] | None]

Asynchronously applies updates to multiple items in both MongoDB and Elasticsearch.

This method updates items in MongoDB and, if enabled, Elasticsearch. It ensures that the updates performed on both databases match the provided IDs and logs any inconsistencies.

MongoDB and Elasticsearch updates are concurrently updated.

Errors are raised if the MongoDB operation is not acknowledged, while warnings are logged for any discrepancies between the number of IDs provided and the number of items updated.

Parameters:
  • ids – A set of IDs for the items to be updated. Each ID can either be a string or an ObjectId.

  • updates – A dictionary containing the fields and values to update.

Returns:

A tuple containing the result of the MongoDB update and Elasticsearch operations.

Raises:

SuperdeskApiError.badRequestError – If the MongoDB update operation is not acknowledged.

class ResourceCursorAsync(data_class: Type[ResourceModelType])[source]
get_model_instance(data: dict[str, Any])

Get a model instance from a dictionary of values

Parameters:

data – Dictionary containing values to get a model instance from

Return type:

ResourceModelType

Returns:

A model instance

class ElasticsearchResourceCursorAsync(data_class: Type[ResourceModelType], hits=None)[source]
extra(response: dict[str, Any])

Add extra info to response

class MongoResourceCursorAsync(data_class: Type[ResourceModelType], collection: AsyncIOMotorCollection, cursor: AsyncIOMotorCursor, lookup: dict[str, Any], collation: Collation | None = None)[source]
class AsyncCacheableService[source]

Handles caching for the resource, will invalidate on any changes to the resource.

Attributes:

resource_name (str): The name of the resource this service handles. cache_lookup (dict): A dictionary to specify custom lookup parameters for caching.

get_cache() Any

Retrieve the cached value from Flask’s g object if available.

set_cache(value: Any) None

Set the cached value in Flask’s g object.

async get_cached() List[Dict[str, Any]]

Retrieve cached data for the resource. If the cache is empty, fetches data from the database and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL).

Returns:

List[Dict[str, Any]]: A list of dictionaries containing the cached data.

async get_cached_by_id(_id: str)

Retrieve a specific resource by its ID from the cached data. If the resource is not found in the cache, fetches it directly from the database.

Args:

_id (string): The ID of the resource to retrieve.

Returns:

Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None.