Resource Management

Resource Services

The management of resources is performed using the AsyncResourceService class instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements.

One major difference is the need to directly import the resource service from the module, and not use get_resource_service. This gives us the full typed interface into the specifics of that resource.

For example:

from my_module.users import UsersService, User

async def test_users_service():
    service = UsersService()  # automatic singleton instance

    # Creat the user in both MongoDB and Elasticsearch
    await service.create([
        User(
            id="user_1",
            first_name="Monkey",
            last_name="Magic"
        )
    ])

    # Retrieve the user from the service
    my_user = await service.find_one(_id="user_1")
    assert my_user.first_name == "Monkey"

    # Update the user
    await service.update("user_1", {"last_name": "Mania"}

    # Find users
    cursor = service.search({
        "query": {
            "match": {
                "first_name": "Monkey"
            }
        }
    })
    assert (await cursor.count()) == 1
    async for user in cursor:
        print(user)

    # Iterate over all users, in batches
    async for user in service.get_all_batch():
        print(user)

    # Delete the user from both MongoDB and Elasticsearch
    await service.delete({"_id": "user_1"})

Search References

class AsyncResourceService[source]
property mongo

Return instance of MongoCollection for this resource

property mongo_async

Return instance of async MongoCollection for this resource

property elastic

Returns instance of ElasticResourceAsyncClient for this resource

Raises:

KeyError – If this resource is not configured for Elasticsearch

get_model_instance_from_dict(data: Dict[str, Any]) ResourceModelType

Converts a dictionary to an instance of ResourceModel for this resource

Parameters:

data – Dictionary to convert

Returns:

Instance of ResourceModel for this resource

async find_by_id(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceModelType | None

Find a resource by ID

Parameters:
  • item_id – ID of item to find

  • version – Optional version to get

  • projection – The field projections to be applied

Returns:

None if resource not found, otherwise an instance of ResourceModel for this resource

async find_by_id_raw(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Dict[str, Any] | None

Find a resource by ID

Parameters:
  • item_id – ID of item to find

  • version – Optional version to get

  • projection – The field projections to be applied

Returns:

None if resource not found, otherwise a dictionary of the item

async find_by_ids(ids: Sequence[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[ResourceModelType]

Find resources by IDs

Parameters:
  • ids – IDs of items to find

  • projection – The field projections to be applied

Returns:

List of ResourceModel instances found

async find_by_ids_raw(ids: list[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[dict[str, Any]]

Find resources by IDs

Parameters:
  • ids – IDs of items to find

  • projection – The field projections to be applied

Returns:

List of dictionaries for the resources found

async search(lookup: Dict[str, Any], use_mongo=False) ResourceCursorAsync[ResourceModelType]

Search the resource using the provided lookup

Will use Elasticsearch if configured for this resource and use_mongo == False.

Parameters:
  • lookup – Dictionary to search

  • use_mongo – Force to use MongoDB instead of Elasticsearch

Returns:

A ResourceCursorAsync instance with the response

async on_create(docs: List[ResourceModelType]) None

Hook to run before creating new resource(s)

Parameters:

docs – List of resources to create

async validate_create(doc: ResourceModelType)

Validate the provided doc for creation

Runs the async validators

Parameters:

doc – Model instance to validate

Raises:

ValueError – If the item is not valid

async validate_update(updates: Dict[str, Any], original: ResourceModelType, etag: str | None) Dict[str, Any]

Validate the provided updates dict against the original model instance

Applies the updates to a copy of the original provided, and runs sync and async validators

Parameters:
  • updates – Dictionary of updates to be applied

  • original – Model instance of the original item to be updated

  • etag – Optional etag, if provided will check etag against original item

Raises:

ValueError – If the item is not valid

async create(docs: Sequence[ResourceModelType | dict[str, Any]]) List[ResourceModelType]

Creates a new resource

Will automatically create the resource(s) in both Elasticsearch (if configured for this resource) and MongoDB.

Parameters:

docs – List of resources or dictionaries to create the registries

Returns:

List of IDs for the created resources

Raises:

Pydantic.ValidationError – If any of the docs provided are not valid

async on_created(docs: List[ResourceModelType]) None

Hook to run after creating new resource(s)

Parameters:

docs – List of resources that were created

async on_update(updates: Dict[str, Any], original: ResourceModelType) None

Hook to run before updating a resource

Parameters:
  • item_id – ID of item to update

  • updates – Dictionary to update

  • original – Instance of ResourceModel for the original resource

async update(item_id: str | ObjectId, updates: Dict[str, Any], etag: str | None = None, original: ResourceModelType | None = None) ResourceModelType

Updates an existing resource

Will automatically update the resource in both Elasticsearch (if configured for this resource) and MongoDB.

Parameters:
  • item_id – ID of item to update

  • updates – Dictionary to update

  • etag – Optional etag, if provided will check etag against original item

  • original – Optional original item, if not provided the service will retrieve it

Raises:

SuperdeskApiError.notFoundError – If original item not found

async on_updated(updates: Dict[str, Any], original: ResourceModelType) None

Hook to run after a resource has been updated

Parameters:
  • updates – Dictionary to update

  • original – Instance of ResourceModel for the original resource

async on_delete(doc: ResourceModelType)

Hook to run before deleting a resource

Parameters:

doc – Instance of ResourceModel for the resource to delete

async delete(doc: ResourceModelType, etag: str | None = None)

Deletes a resource

Parameters:
  • doc – Instance of ResourceModel for the resource to delete

  • etag – Optional etag, if provided will check etag against original item

async delete_many(lookup: Dict[str, Any]) List[str]

Deletes resource(s) using a lookup

Parameters:

lookup – Dictionary for the lookup to find items to delete

Returns:

List of IDs for the deleted resources

async on_deleted(doc: ResourceModelType)

Hook to run after deleting a resource

Parameters:

doc – Instance of ResourceModel for the resource that was deleted

async get_all(lookup: dict | None = None) AsyncIterable[ResourceModelType]

Helper function to get all items from this resource

Returns:

An async iterable with ResourceModel instances

get_all_raw(lookup: dict | None = None) AsyncIOMotorCursor

Helper function to get all items for this resource, as a MongoDB cursor

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A MongoDB cursor with the results, sorted by ID

async get_all_list(lookup: dict | None = None) list[ResourceModelType]

Helper function to get all items from this resource, as a list

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A list of ResourceModel instances

async get_all_list_raw(lookup: dict | None = None) list[dict]

Helper function to get all items from this resource, as a list

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A list of dictionaries

async get_all_map(lookup: dict | None = None) dict[str | ObjectId, ResourceModelType]

Helper function to get all items from this resource, as a dictionary where key is the item ID.

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A dictionary of ResourceModel instances, where the key is the ID of an item

async get_all_map_raw(lookup: dict | None = None) dict[str | ObjectId, dict]

Helper function to get all items from this resource, as a dictionary where key is the item ID.

Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use get_all or get_all_batch instead.

Parameters:

lookup – Optional MongoDB filter to be applied

Returns:

A dictionary of resource dictionary, where the key is the ID of an item

async get_all_batch(size=500, max_iterations=10000, lookup=None) AsyncIterable[ResourceModelType]

Helper function to get all items from this resource, in batches

Parameters:
  • size – Number of items to fetch on each iteration

  • max_iterations – Maximum number of iterations to run, before returning gracefully

  • lookup – Optional dictionary used to filter items for

Returns:

An async iterable with ResourceModel instances

async find(req: SearchRequest) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]
async find(req: dict, page: int = 1, max_results: int = 25, sort: str | list[tuple[str, Literal[1, -1]]] | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None, use_mongo: bool = False) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]

Find items from the resource using Elasticsearch

Parameters:
  • req – SearchRequest instance, or a lookup dictionary, for the search params to be used

  • page – The page number to retrieve (defaults to 1)

  • max_results – The maximum number of results to retrieve per page (defaults to 25)

  • sort – The sort order to use (defaults to resource default sort, or not sorting applied)

  • projection – The field projections to be applied

  • use_mongo – If True will force use mongo, else will attempt elastic first

Returns:

An async iterable with ResourceModel instances

Raises:

SuperdeskApiError.notFoundError – If Elasticsearch is not configured

async count(lookup: dict[str, Any] | None = None, use_mongo: bool = False) int

Get the number of items that match the lookup, or all items if lookup is not provided

Will use Elasticsearch if configured for this resource and use_mongo == False. This will not perform a search, but use the item count feature of the underlying data store

Parameters:
  • lookup – Dictionary to search

  • use_mongo – Force to use MongoDB instead of Elasticsearch

Returns:

The number of items that match the lookup

validate_etag(original: ResourceModelType, etag: str | None) None

Validate the provided etag against the original

If the provided etag argument is None, then validation will not occur

Parameters:
  • original – Instance of ResourceModel for the resource to validate etag against

  • etag – Etag string to validate

Raises:

SuperdeskApiError.preconditionFailedError – If the provided etag is invalid

async get_all_item_versions(item_id: str, max_results: int = 200, page: int = 1, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) tuple[list[dict], int]

Get all versions of an item, with pagination and projection supported.

Parameters:
  • item_id – The ID of the item to get all versions for

  • max_results – The maximum number of results to retrieve per page (defaults to 200)

  • projection – The field projections to be applied

Returns:

A tuple - list of dictionary items and the count of total items

Raises:

SuperdeskApiError.badRequestError – If versioning is not enabled on the resource

async get_item_version(item: dict, version: int) dict

Get a specific version of an item

Parameters:
  • item – The item dictionary to get a version for

  • version – The version to get

Returns:

A dictionary for the specific version of the item

Raises:

SuperdeskApiError.badRequestError – If versioning is not enabled on the resource

async system_update(item_id: ObjectId | str, updates: dict[str, Any], update_etag: bool = False) None

Update an item with the supplied updates, and do not update the etag or call any signals

Parameters:
  • item_id – The ID of the item to update

  • updates – A dictionary of values to update

  • update_etag – If True will update the item’s _etag

class ResourceCursorAsync(data_class: Type[ResourceModelType])[source]
get_model_instance(data: dict[str, Any])

Get a model instance from a dictionary of values

Parameters:

data – Dictionary containing values to get a model instance from

Return type:

ResourceModelType

Returns:

A model instance

class ElasticsearchResourceCursorAsync(data_class: Type[ResourceModelType], hits=None)[source]
extra(response: dict[str, Any])

Add extra info to response

class MongoResourceCursorAsync(data_class: Type[ResourceModelType], collection: AsyncIOMotorCollection, cursor: AsyncIOMotorCursor, lookup: dict[str, Any], collation: Collation | None = None)[source]
class AsyncCacheableService[source]

Handles caching for the resource, will invalidate on any changes to the resource.

Attributes:

resource_name (str): The name of the resource this service handles. cache_lookup (dict): A dictionary to specify custom lookup parameters for caching.

get_cache() Any

Retrieve the cached value from Flask’s g object if available.

set_cache(value: Any) None

Set the cached value in Flask’s g object.

async get_cached() List[Dict[str, Any]]

Retrieve cached data for the resource. If the cache is empty, fetches data from the database and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL).

Returns:

List[Dict[str, Any]]: A list of dictionaries containing the cached data.

async get_cached_by_id(_id: str)

Retrieve a specific resource by its ID from the cached data. If the resource is not found in the cache, fetches it directly from the database.

Args:

_id (string): The ID of the resource to retrieve.

Returns:

Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None.