SqlRequestQueueClient
Hierarchy
- SqlClientMixin
- RequestQueueClient
- SqlRequestQueueClient
Index
Methods
__init__
Initialize a new instance.
Preferably use the
SqlRequestQueueClient.open
class method to create a new instance.Parameters
keyword-onlyid: str
keyword-onlystorage_client: SqlStorageClient
Returns None
add_batch_of_requests
Add batch of requests to the queue.
This method adds a batch of requests to the queue. Each request is processed based on its uniqueness (determined by
unique_key
). Duplicates will be identified but not re-added to the queue.Parameters
requests: Sequence[Request]
The collection of requests to add to the queue.
optionalkeyword-onlyforefront: bool = False
Whether to put the added requests at the beginning (True) or the end (False) of the queue. When True, the requests will be processed sooner than previously added requests.
Returns AddRequestsResponse
drop
Delete this request queue and all its records from the database.
This operation is irreversible. Uses CASCADE deletion to remove all related records.
Returns None
fetch_next_request
Return the next request in the queue to be processed.
Once you successfully finish processing of the request, you need to call
RequestQueue.mark_request_as_handled
to mark the request as handled in the queue. If there was some error in processing the request, callRequestQueue.reclaim_request
instead, so that the queue will give the request to some other consumer in another call to thefetch_next_request
method.Note that the
None
return value does not mean the queue processing finished, it means there are currently no pending requests. To check whether all requests in queue were finished, useRequestQueue.is_finished
instead.Returns Request | None
get_metadata
Get the metadata of the request queue.
Returns RequestQueueMetadata
get_request
Retrieve a request from the queue.
Parameters
unique_key: str
Unique key of the request to retrieve.
Returns Request | None
get_session
Create a new SQLAlchemy session for this storage.
Parameters
optionalkeyword-onlywith_simple_commit: bool = False
Returns AsyncIterator[AsyncSession]
is_empty
Check if the request queue is empty.
Returns bool
mark_request_as_handled
Mark a request as handled after successful processing.
Handled requests will never again be returned by the
RequestQueue.fetch_next_request
method.Parameters
request: Request
The request to mark as handled.
Returns ProcessedRequest | None
open
Open an existing request queue or create a new one.
This method first tries to find an existing queue by ID or name. If found, it returns a client for that queue. If not found, it creates a new queue with the specified parameters.
Parameters
keyword-onlyid: str | None
The ID of the request queue to open. Takes precedence over name.
keyword-onlyname: str | None
The name of the request queue for named (global scope) storages.
keyword-onlyalias: str | None
The alias of the request queue for unnamed (run scope) storages.
keyword-onlystorage_client: SqlStorageClient
The SQL storage client used to access the database.
Returns SqlRequestQueueClient
purge
Remove all items from this dataset while keeping the dataset structure.
Resets pending_request_count and handled_request_count to 0 and deletes all records from request_queue_records table.
Returns None
reclaim_request
Reclaim a failed request back to the queue.
The request will be returned for processing later again by another call to
RequestQueue.fetch_next_request
.Parameters
request: Request
The request to return to the queue.
optionalkeyword-onlyforefront: bool = False
Whether to add the request to the head or the end of the queue.
Returns ProcessedRequest | None
SQL implementation of the request queue client.
This client persists requests to a SQL database with transaction handling and concurrent access safety. Requests are stored with sequence-based ordering and efficient querying capabilities.
The implementation uses negative sequence numbers for forefront (high-priority) requests and positive sequence numbers for regular requests, allowing for efficient single-query ordering. A cache mechanism reduces database queries.
The request queue data is stored in SQL database tables following the pattern:
request_queues
table: Contains queue metadata (id, name, timestamps, request counts, multi-client flag)request_queue_records
table: Contains individual requests with JSON data, unique keys for deduplication, sequence numbers for ordering, and processing status flagsrequest_queue_state
table: Maintains counters for sequence numbers to ensure proper ordering of requests.Requests are serialized to JSON for storage and maintain proper ordering through sequence numbers. The implementation provides concurrent access safety through transaction handling, locking mechanisms, and optimized database indexes for efficient querying.