Resource Management¶
Resource Services¶
The management of resources is performed using the AsyncResourceService class
instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements.
One major difference is the need to directly import the resource service from the module, and not use get_resource_service. This gives us the full typed interface into the specifics of that resource.
For example:
from my_module.users import UsersService, User
async def test_users_service():
service = UsersService() # automatic singleton instance
# Creat the user in both MongoDB and Elasticsearch
await service.create([
User(
id="user_1",
first_name="Monkey",
last_name="Magic"
)
])
# Retrieve the user from the service
my_user = await service.find_one(_id="user_1")
assert my_user.first_name == "Monkey"
# Update the user
await service.update("user_1", {"last_name": "Mania"}
# Find users
cursor = service.search({
"query": {
"match": {
"first_name": "Monkey"
}
}
})
assert (await cursor.count()) == 1
async for user in cursor:
print(user)
# Iterate over all users, in batches
async for user in service.get_all_batch():
print(user)
# Delete the user from both MongoDB and Elasticsearch
await service.delete({"_id": "user_1"})
Search References¶
- class AsyncResourceService[source]¶
- property mongo¶
Return instance of MongoCollection for this resource
- property mongo_async¶
Return instance of async MongoCollection for this resource
- property elastic¶
Returns instance of
ElasticResourceAsyncClientfor this resource- Raises:
KeyError – If this resource is not configured for Elasticsearch
- get_model_instance_from_dict(data: Dict[str, Any]) ResourceModelType¶
Converts a dictionary to an instance of
ResourceModelfor this resource- Parameters:
data – Dictionary to convert
- Returns:
Instance of
ResourceModelfor this resource
- async find_by_id(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) ResourceModelType | None¶
Find a resource by ID
- Parameters:
item_id – ID of item to find
version – Optional version to get
projection – The field projections to be applied
- Returns:
Noneif resource not found, otherwise an instance ofResourceModelfor this resource
- async find_by_id_raw(item_id: str | ObjectId, version: int | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) Dict[str, Any] | None¶
Find a resource by ID
- Parameters:
item_id – ID of item to find
version – Optional version to get
projection – The field projections to be applied
- Returns:
Noneif resource not found, otherwise a dictionary of the item
- async find_by_ids(ids: Sequence[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[ResourceModelType]¶
Find resources by IDs
- Parameters:
ids – IDs of items to find
projection – The field projections to be applied
- Returns:
List of ResourceModel instances found
- async find_by_ids_raw(ids: list[str | ObjectId], projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) list[dict[str, Any]]¶
Find resources by IDs
- Parameters:
ids – IDs of items to find
projection – The field projections to be applied
- Returns:
List of dictionaries for the resources found
- async search(lookup: Dict[str, Any], use_mongo=False) ResourceCursorAsync[ResourceModelType]¶
Search the resource using the provided
lookupWill use Elasticsearch if configured for this resource and
use_mongo == False.- Parameters:
lookup – Dictionary to search
use_mongo – Force to use MongoDB instead of Elasticsearch
- Returns:
A
ResourceCursorAsyncinstance with the response
- async on_create(docs: List[ResourceModelType]) None¶
Hook to run before creating new resource(s)
- Parameters:
docs – List of resources to create
- async validate_create(doc: ResourceModelType)¶
Validate the provided doc for creation
Runs the async validators
- Parameters:
doc – Model instance to validate
- Raises:
ValueError – If the item is not valid
- async validate_update(updates: Dict[str, Any], original: ResourceModelType, etag: str | None) Dict[str, Any]¶
Validate the provided updates dict against the original model instance
Applies the updates to a copy of the original provided, and runs sync and async validators
- Parameters:
updates – Dictionary of updates to be applied
original – Model instance of the original item to be updated
etag – Optional etag, if provided will check etag against original item
- Raises:
ValueError – If the item is not valid
- async create(docs: Sequence[ResourceModelType | dict[str, Any]]) List[ResourceModelType]¶
Creates a new resource
Will automatically create the resource(s) in both Elasticsearch (if configured for this resource) and MongoDB.
- Parameters:
docs – List of resources or dictionaries to create the registries
- Returns:
List of IDs for the created resources
- Raises:
Pydantic.ValidationError – If any of the docs provided are not valid
- async on_created(docs: List[ResourceModelType]) None¶
Hook to run after creating new resource(s)
- Parameters:
docs – List of resources that were created
- async on_update(updates: Dict[str, Any], original: ResourceModelType) None¶
Hook to run before updating a resource
- Parameters:
item_id – ID of item to update
updates – Dictionary to update
original – Instance of
ResourceModelfor the original resource
- async update(item_id: str | ObjectId, updates: Dict[str, Any], etag: str | None = None, original: ResourceModelType | None = None) ResourceModelType¶
Updates an existing resource
Will automatically update the resource in both Elasticsearch (if configured for this resource) and MongoDB.
- Parameters:
item_id – ID of item to update
updates – Dictionary to update
etag – Optional etag, if provided will check etag against original item
original – Optional original item, if not provided the service will retrieve it
- Raises:
SuperdeskApiError.notFoundError – If original item not found
- async on_updated(updates: Dict[str, Any], original: ResourceModelType) None¶
Hook to run after a resource has been updated
- Parameters:
updates – Dictionary to update
original – Instance of
ResourceModelfor the original resource
- async on_delete(doc: ResourceModelType)¶
Hook to run before deleting a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource to delete
- async delete(doc: ResourceModelType, etag: str | None = None)¶
Deletes a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource to deleteetag – Optional etag, if provided will check etag against original item
- async delete_many(lookup: Dict[str, Any]) List[str]¶
Deletes resource(s) using a lookup
- Parameters:
lookup – Dictionary for the lookup to find items to delete
- Returns:
List of IDs for the deleted resources
- async on_deleted(doc: ResourceModelType)¶
Hook to run after deleting a resource
- Parameters:
doc – Instance of
ResourceModelfor the resource that was deleted
- async get_all(lookup: dict | None = None) AsyncIterable[ResourceModelType]¶
Helper function to get all items from this resource
- Returns:
An async iterable with
ResourceModelinstances
- get_all_raw(lookup: dict | None = None) AsyncIOMotorCursor¶
Helper function to get all items for this resource, as a MongoDB cursor
- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A MongoDB cursor with the results, sorted by ID
- async get_all_list(lookup: dict | None = None) list[ResourceModelType]¶
Helper function to get all items from this resource, as a list
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A list of
ResourceModelinstances
- async get_all_list_raw(lookup: dict | None = None) list[dict]¶
Helper function to get all items from this resource, as a list
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A list of dictionaries
- async get_all_map(lookup: dict | None = None) dict[str | ObjectId, ResourceModelType]¶
Helper function to get all items from this resource, as a dictionary where key is the item ID.
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A dictionary of
ResourceModelinstances, where the key is the ID of an item
- async get_all_map_raw(lookup: dict | None = None) dict[str | ObjectId, dict]¶
Helper function to get all items from this resource, as a dictionary where key is the item ID.
Note: This is only to be used with small collection, DO NOT use on large collections, as the performance and resource use will be too high. Use
get_allorget_all_batchinstead.- Parameters:
lookup – Optional MongoDB filter to be applied
- Returns:
A dictionary of resource dictionary, where the key is the ID of an item
- async get_all_batch(size=500, max_iterations=10000, lookup=None) AsyncIterable[ResourceModelType]¶
Helper function to get all items from this resource, in batches
- Parameters:
size – Number of items to fetch on each iteration
max_iterations – Maximum number of iterations to run, before returning gracefully
lookup – Optional dictionary used to filter items for
- Returns:
An async iterable with
ResourceModelinstances
- async find(req: SearchRequest) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]¶
- async find(req: dict, page: int = 1, max_results: int = 25, sort: str | list[tuple[str, Literal[1, -1]]] | None = None, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None, use_mongo: bool = False) ElasticsearchResourceCursorAsync[ResourceModelType] | MongoResourceCursorAsync[ResourceModelType]
Find items from the resource using Elasticsearch
- Parameters:
req – SearchRequest instance, or a lookup dictionary, for the search params to be used
page – The page number to retrieve (defaults to 1)
max_results – The maximum number of results to retrieve per page (defaults to 25)
sort – The sort order to use (defaults to resource default sort, or not sorting applied)
projection – The field projections to be applied
use_mongo – If
Truewill force use mongo, else will attempt elastic first
- Returns:
An async iterable with
ResourceModelinstances- Raises:
SuperdeskApiError.notFoundError – If Elasticsearch is not configured
- async count(lookup: dict[str, Any] | None = None, use_mongo: bool = False) int¶
Get the number of items that match the lookup, or all items if lookup is not provided
Will use Elasticsearch if configured for this resource and
use_mongo == False. This will not perform a search, but use the item count feature of the underlying data store- Parameters:
lookup – Dictionary to search
use_mongo – Force to use MongoDB instead of Elasticsearch
- Returns:
The number of items that match the lookup
- validate_etag(original: ResourceModelType, etag: str | None) None¶
Validate the provided etag against the original
If the provided
etagargument isNone, then validation will not occur- Parameters:
original – Instance of
ResourceModelfor the resource to validate etag againstetag – Etag string to validate
- Raises:
SuperdeskApiError.preconditionFailedError – If the provided etag is invalid
- async get_all_item_versions(item_id: str, max_results: int = 200, page: int = 1, projection: list[str] | set[str] | dict[str, Literal[0]] | dict[str, Literal[1]] | dict[str, Literal[True]] | dict[str, Literal[False]] | None = None) tuple[list[dict], int]¶
Get all versions of an item, with pagination and projection supported.
- Parameters:
item_id – The ID of the item to get all versions for
max_results – The maximum number of results to retrieve per page (defaults to 200)
projection – The field projections to be applied
- Returns:
A tuple - list of dictionary items and the count of total items
- Raises:
SuperdeskApiError.badRequestError – If versioning is not enabled on the resource
- async get_item_version(item: dict, version: int) dict¶
Get a specific version of an item
- Parameters:
item – The item dictionary to get a version for
version – The version to get
- Returns:
A dictionary for the specific version of the item
- Raises:
SuperdeskApiError.badRequestError – If versioning is not enabled on the resource
- async system_update(item_id: ObjectId | str, updates: dict[str, Any], update_etag: bool = False) None¶
Update an item with the supplied updates, and do not update the etag or call any signals
- Parameters:
item_id – The ID of the item to update
updates – A dictionary of values to update
update_etag – If
Truewill update the item’s_etag
- class ResourceCursorAsync(data_class: Type[ResourceModelType])[source]¶
- get_model_instance(data: dict[str, Any])¶
Get a model instance from a dictionary of values
- Parameters:
data – Dictionary containing values to get a model instance from
- Return type:
ResourceModelType
- Returns:
A model instance
- class ElasticsearchResourceCursorAsync(data_class: Type[ResourceModelType], hits=None)[source]¶
- extra(response: dict[str, Any])¶
Add extra info to response
- class MongoResourceCursorAsync(data_class: Type[ResourceModelType], collection: AsyncIOMotorCollection, cursor: AsyncIOMotorCursor, lookup: dict[str, Any], collation: Collation | None = None)[source]¶
- class AsyncCacheableService[source]¶
Handles caching for the resource, will invalidate on any changes to the resource.
- Attributes:
resource_name (str): The name of the resource this service handles. cache_lookup (dict): A dictionary to specify custom lookup parameters for caching.
- get_cache() Any¶
Retrieve the cached value from Flask’s g object if available.
- set_cache(value: Any) None¶
Set the cached value in Flask’s g object.
- async get_cached() List[Dict[str, Any]]¶
Retrieve cached data for the resource. If the cache is empty, fetches data from the database and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL).
- Returns:
List[Dict[str, Any]]: A list of dictionaries containing the cached data.
- async get_cached_by_id(_id: str)¶
Retrieve a specific resource by its ID from the cached data. If the resource is not found in the cache, fetches it directly from the database.
- Args:
_id (string): The ID of the resource to retrieve.
- Returns:
Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None.