Resource Management¶
Resource Services¶
The management of resources is performed using the AsyncResourceService class
instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements.
One major difference is the need to directly import the resource service from the module, and not use get_resource_service. This gives us the full typed interface into the specifics of that resource.
For example:
from my_module.users import UsersService, User
async def test_users_service():
service = UsersService() # automatic singleton instance
# Creat the user in both MongoDB and Elasticsearch
await service.create([
User(
id="user_1",
first_name="Monkey",
last_name="Magic"
)
])
# Retrieve the user from the service
my_user = await service.find_one(_id="user_1")
assert my_user.first_name == "Monkey"
# Update the user
await service.update("user_1", {"last_name": "Mania"}
# Find users
cursor = service.search({
"query": {
"match": {
"first_name": "Monkey"
}
}
})
assert (await cursor.count()) == 1
async for user in cursor:
print(user)
# Iterate over all users, in batches
async for user in service.get_all_batch():
print(user)
# Delete the user from both MongoDB and Elasticsearch
await service.delete({"_id": "user_1"})
Search References¶
- class AsyncResourceService[source]¶
- property mongo¶
Return instance of MongoCollection for this resource
- property mongo_async¶
Return instance of async MongoCollection for this resource
- property elastic¶
Returns instance of
ElasticResourceAsyncClientfor this resource- Raises:
KeyError – If this resource is not configured for Elasticsearch
- get_model_instance_from_dict(data: Dict[str, Any]) ResourceModelType¶
Converts a dictionary to an instance of
ResourceModelfor this resource- Parameters:
data – Dictionary to convert
- Returns:
Instance of
ResourceModelfor this resource
- async find_by_id(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceModelType | None¶
Find a resource by ID
- Parameters:
item_id – ID of item to find
version – Optional version to get
projection – The field projections to be applied
- Returns:
Noneif resource not found, otherwise an instance ofResourceModelfor this resource
- async find_by_id_raw(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Dict[str, Any] | None¶
Find a resource by ID
- Parameters:
item_id – ID of item to find
version – Optional version to get
projection – The field projections to be applied
- Returns:
Noneif resource not found, otherwise a dictionary of the item
- async find_by_ids(ids: Sequence[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[ResourceModelType]¶
Find resources by IDs
- Parameters:
ids – IDs of items to find
projection – The field projections to be applied
- Returns:
List of ResourceModel instances found
- async find_by_ids_raw(ids: list[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[dict[str, Any]]¶
Find resources by IDs
- Parameters:
ids – IDs of items to find
projection – The field projections to be applied
- Returns:
List of dictionaries for the resources found
- async search(lookup: Dict[str, Any], use_mongo=False, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceCursorAsync[ResourceModelType]¶
Search the resource using the provided
lookupWill use Elasticsearch if configured for this resource and
use_mongo == False.- Parameters:
lookup – Dictionary to search
use_mongo – Force to use MongoDB instead of Elasticsearch
projection – Optional projection to apply to the search results
- Returns:
A
ResourceCursorAsyncinstance with the response
- async on_create(docs: List[ResourceModelType]) None¶
Hook to run before creating new resource(s)
- Parameters:
docs – List of resources to create
- async validate_create(doc: ResourceModelType)¶
Validate the provided doc for creation
Runs the async validators
- Parameters:
doc – Model instance to validate
- Raises:
ValueError – If the item is not valid
- async validate_update(updates: Dict[str, Any], original: ResourceModelType, etag: str | None) Dict[str, Any]¶
Validate the provided updates dict against the original model instance
Applies the updates to a copy of the original provided, and runs sync and async validators
- Parameters:
updates – Dictionary of updates to be applied
original – Model instance of the original item to be updated
etag – Optional etag, if provided will check etag against original item
- Raises:
ValueError – If the item is not valid
- async create(docs: Sequence[ResourceModelType | dict]) List[ResourceModelType]¶
Creates a new resource
Will automatically create the resource(s) in both Elasticsearch (if configured for this resource) and MongoDB.
- Parameters:
docs – List of resources or dictionaries to create the registries
- Returns:
List of IDs for the created resources
- Raises:
Pydantic.ValidationError – If any of the docs provided are not valid
- async on_created(docs: List[ResourceModelType]) None¶
Hook to run after creating new resource(s)
- Parameters:
docs – List of resources that were created
- async on_update(updates: Dict[str, Any], original: ResourceModelType) None¶
Hook to run before updating a resource
- Parameters:
item_id – ID of item to update
updates – Dictionary to update
original – Instance of
ResourceModelfor the original resource
- async update(item_id: str | ObjectId, updates: Dict[str, Any], etag: str | None = None, original: ResourceModelType | None = None) ResourceModelType¶
Updates an existing resource
Will automatically update the resource in both Elasticsearch (if configured for this resource) and MongoDB.
- Parameters:
item_id – ID of item to update
updates – Dictionary to update
etag – Optional etag, if provided will check etag against original item
original – Optional original item, if not provided the service will retrieve it
- Raises:
SuperdeskApiError.notFoundError – If original item not found
- async on_updated(updates: Dict[str, Any], original: ResourceModelType) None¶
Hook to run after a resource has been updated
- Parameters:
updates – Dictionary to update
original – Instance of
ResourceModelfor the original resource
- async on_delete(doc: ResourceModelType)¶
Hook to run before deleting a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource to delete
- async delete(doc: ResourceModelType, etag: str | None = None)¶
Deletes a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource to deleteetag – Optional etag, if provided will check etag against original item
- async delete_many(lookup: Dict[str, Any]) List[str]¶
Deletes resource(s) using a lookup
- Parameters:
lookup – Dictionary for the lookup to find items to delete
- Returns:
List of IDs for the deleted resources
- async on_deleted(doc: ResourceModelType)¶
Hook to run after deleting a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource that was deleted
- async get_all(lookup: dict | None = None) AsyncIterable[ResourceModelType]¶
Helper function to get all items from this resource
- Returns:
An async iterable with
ResourceModelinstances
- get_all_raw(lookup: dict | None = None) AsyncIOMotorCursor¶
Helper function to get all items for this resource, as a MongoDB cursor
- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A MongoDB cursor with the results, sorted by ID
- async get_all_list(lookup: dict | None = None) list[ResourceModelType]¶
Helper function to get all items from this resource, as a list
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A list of
ResourceModelinstances
- async get_all_list_raw(lookup: dict | None = None) list[dict]¶
Helper function to get all items from this resource, as a list
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A list of dictionaries
- async get_all_map(lookup: dict | None = None) dict[str | ObjectId, ResourceModelType]¶
Helper function to get all items from this resource, as a dictionary where key is the item ID.
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A dictionary of
ResourceModelinstances, where the key is the ID of an item
- async get_all_map_raw(lookup: dict | None = None) dict[str | ObjectId, dict]¶
Helper function to get all items from this resource, as a dictionary where key is the item ID.
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A dictionary of resource dictionary, where the key is the ID of an item
- async get_all_batch(size=500, max_iterations=10000, lookup=None) AsyncIterable[ResourceModelType]¶
Helper function to get all items from this resource, in batches
- Parameters:
size – Number of items to fetch on each iteration
max_iterations – Maximum number of iterations to run, before returning gracefully
lookup – Optional dictionary used to filter items for
- Returns:
An async iterable with
ResourceModelinstances
- async get_all_batch_elastic_raw(query: dict | None = None, size: int = 500, max_iterations: int = 10000, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) AsyncGenerator[dict, None]¶
Helper function to get all items from this resource, in batches
The default arguments allow iterating over 5 million documents. If more is needed, then consider increasing
sizeand/ormax_iterations- Parameters:
query – An optional elasticsearch query used to filter items for
size – The number of items to fetch on each iteration
max_iterations – Maximum number of iterations to run, before returning gracefully
projection – Optional projection to apply to the search results
- Returns:
An async generator yielding a dictionary of the document
- async get_all_batch_elastic(query: dict | None = None, size: int = 500, max_iterations: int = 10000, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) AsyncGenerator[ResourceModelType, None]¶
Helper function to get all items from this resource, in batches
The default arguments allow iterating over 5 million documents. If more is needed, then consider increasing
sizeand/ormax_iterations- Parameters:
query – An optional elasticsearch query used to filter items for
size – The number of items to fetch on each iteration
max_iterations – Maximum number of iterations to run, before returning gracefully
projection – Optional projection to apply to the search results
- Returns:
An async generator with
ResourceModelinstances
- async find(req: SearchRequest) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]¶
- async find(req: dict, page: int = 1, max_results: int = 25, sort: str | list[tuple[str, Literal[1, -1]]] | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None, use_mongo: bool = False) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]
Find items from the resource using Elasticsearch
- Parameters:
req – SearchRequest instance, or a lookup dictionary, for the search params to be used
page – The page number to retrieve (defaults to 1)
max_results – The maximum number of results to retrieve per page (defaults to 25)
sort – The sort order to use (defaults to resource default sort, or not sorting applied)
projection – The field projections to be applied
use_mongo – If
Truewill force use mongo, else will attempt elastic first
- Returns:
An async iterable with
ResourceModelinstances- Raises:
SuperdeskApiError.notFoundError – If Elasticsearch is not configured
- async count(lookup: dict[str, Any] | None = None, use_mongo: bool = False) int¶
Get the number of items that match the lookup, or all items if lookup is not provided
Will use Elasticsearch if configured for this resource and
use_mongo == False. This will not perform a search, but use the item count feature of the underlying data store- Parameters:
lookup – Dictionary to search
use_mongo – Force to use MongoDB instead of Elasticsearch
- Returns:
The number of items that match the lookup
- validate_etag(original: ResourceModelType, etag: str | None) None¶
Validate the provided etag against the original
If the provided
etagargument isNone, then validation will not occur- Parameters:
original – Instance of
ResourceModelfor the resource to validate etag againstetag – Etag string to validate
- Raises:
SuperdeskApiError.preconditionFailedError – If the provided etag is invalid
- async get_all_item_versions(item_id: str, max_results: int = 200, page: int = 1, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) tuple[list[dict], int]¶
Get all versions of an item, with pagination and projection supported.
- Parameters:
item_id – The ID of the item to get all versions for
max_results – The maximum number of results to retrieve per page (defaults to 200)
projection – The field projections to be applied
- Returns:
A tuple - list of dictionary items and the count of total items
- Raises:
SuperdeskApiError.badRequestError – If versioning is not enabled on the resource
- async get_item_version(item: dict, version: int) dict¶
Get a specific version of an item
- Parameters:
item – The item dictionary to get a version for
version – The version to get
- Returns:
A dictionary for the specific version of the item
- Raises:
SuperdeskApiError.badRequestError – If versioning is not enabled on the resource
- async system_update(item_id: ObjectId | str, updates: dict[str, Any], update_etag: bool = False) None¶
Update an item with the supplied updates, and do not update the etag or call any signals
- Parameters:
item_id – The ID of the item to update
updates – A dictionary of values to update
update_etag – If
Truewill update the item’s_etag
- async bulk_update(ids: set[str | ObjectId], updates: dict) tuple[UpdateResult, tuple[int, list[dict]] | None]¶
Asynchronously applies updates to multiple items in both MongoDB and Elasticsearch.
This method updates items in MongoDB and, if enabled, Elasticsearch. It ensures that the updates performed on both databases match the provided IDs and logs any inconsistencies.
MongoDB and Elasticsearch updates are concurrently updated.
Errors are raised if the MongoDB operation is not acknowledged, while warnings are logged for any discrepancies between the number of IDs provided and the number of items updated.
- Parameters:
ids – A set of IDs for the items to be updated. Each ID can either be a string or an ObjectId.
updates – A dictionary containing the fields and values to update.
- Returns:
A tuple containing the result of the MongoDB update and Elasticsearch operations.
- Raises:
SuperdeskApiError.badRequestError – If the MongoDB update operation is not acknowledged.
- class ResourceCursorAsync(data_class: Type[ResourceModelType])[source]¶
- get_model_instance(data: dict[str, Any])¶
Get a model instance from a dictionary of values
- Parameters:
data – Dictionary containing values to get a model instance from
- Return type:
ResourceModelType
- Returns:
A model instance
- class ElasticsearchResourceCursorAsync(data_class: Type[ResourceModelType], hits=None)[source]¶
- extra(response: dict[str, Any])¶
Add extra info to response
- class MongoResourceCursorAsync(data_class: Type[ResourceModelType], collection: AsyncIOMotorCollection, cursor: AsyncIOMotorCursor, lookup: dict[str, Any], collation: Collation | None = None)[source]¶
- class AsyncCacheableService[source]¶
Handles caching for the resource, will invalidate on any changes to the resource.
- Attributes:
resource_name (str): The name of the resource this service handles. cache_lookup (dict): A dictionary to specify custom lookup parameters for caching.
- get_cache() Any¶
Retrieve the cached value from Flask’s g object if available.
- set_cache(value: Any) None¶
Set the cached value in Flask’s g object.
- async get_cached() List[Dict[str, Any]]¶
Retrieve cached data for the resource. If the cache is empty, fetches data from the database and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL).
- Returns:
List[Dict[str, Any]]: A list of dictionaries containing the cached data.
- async get_cached_by_id(_id: str)¶
Retrieve a specific resource by its ID from the cached data. If the resource is not found in the cache, fetches it directly from the database.
- Args:
_id (string): The ID of the resource to retrieve.
- Returns:
Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None.