Skip to main content

SqlRequestQueueClient

SQL implementation of the request queue client.

This client persists requests to a SQL database with transaction handling and concurrent access safety. Requests are stored with sequence-based ordering and efficient querying capabilities.

The implementation uses negative sequence numbers for forefront (high-priority) requests and positive sequence numbers for regular requests, allowing for efficient single-query ordering. A cache mechanism reduces database queries.

The request queue data is stored in SQL database tables following the pattern:

  • request_queues table: Contains queue metadata (id, name, timestamps, request counts, multi-client flag)
  • request_queue_records table: Contains individual requests with JSON data, unique keys for deduplication, sequence numbers for ordering, and processing status flags
  • request_queue_state table: Maintains counters for sequence numbers to ensure proper ordering of requests.

Requests are serialized to JSON for storage and maintain proper ordering through sequence numbers. The implementation provides concurrent access safety through transaction handling, locking mechanisms, and optimized database indexes for efficient querying.

Hierarchy

Index

Methods

__init__

  • __init__(*, id, storage_client): None
  • Initialize a new instance.

    Preferably use the SqlRequestQueueClient.open class method to create a new instance.


    Parameters

    Returns None

add_batch_of_requests

  • Add batch of requests to the queue.

    This method adds a batch of requests to the queue. Each request is processed based on its uniqueness (determined by unique_key). Duplicates will be identified but not re-added to the queue.


    Parameters

    • requests: Sequence[Request]

      The collection of requests to add to the queue.

    • optionalkeyword-onlyforefront: bool = False

      Whether to put the added requests at the beginning (True) or the end (False) of the queue. When True, the requests will be processed sooner than previously added requests.

    Returns AddRequestsResponse

drop

  • async drop(): None
  • Delete this request queue and all its records from the database.

    This operation is irreversible. Uses CASCADE deletion to remove all related records.


    Returns None

fetch_next_request

  • async fetch_next_request(): Request | None
  • Return the next request in the queue to be processed.

    Once you successfully finish processing of the request, you need to call RequestQueue.mark_request_as_handled to mark the request as handled in the queue. If there was some error in processing the request, call RequestQueue.reclaim_request instead, so that the queue will give the request to some other consumer in another call to the fetch_next_request method.

    Note that the None return value does not mean the queue processing finished, it means there are currently no pending requests. To check whether all requests in queue were finished, use RequestQueue.is_finished instead.


    Returns Request | None

get_metadata

get_request

  • async get_request(unique_key): Request | None

get_session

  • async get_session(*, with_simple_commit): AsyncIterator[AsyncSession]
  • Create a new SQLAlchemy session for this storage.


    Parameters

    • optionalkeyword-onlywith_simple_commit: bool = False

    Returns AsyncIterator[AsyncSession]

is_empty

  • async is_empty(): bool

mark_request_as_handled

open

  • Open an existing request queue or create a new one.

    This method first tries to find an existing queue by ID or name. If found, it returns a client for that queue. If not found, it creates a new queue with the specified parameters.


    Parameters

    • keyword-onlyid: str | None

      The ID of the request queue to open. Takes precedence over name.

    • keyword-onlyname: str | None

      The name of the request queue for named (global scope) storages.

    • keyword-onlyalias: str | None

      The alias of the request queue for unnamed (run scope) storages.

    • keyword-onlystorage_client: SqlStorageClient

      The SQL storage client used to access the database.

    Returns SqlRequestQueueClient

purge

  • async purge(): None
  • Remove all items from this dataset while keeping the dataset structure.

    Resets pending_request_count and handled_request_count to 0 and deletes all records from request_queue_records table.


    Returns None

reclaim_request

  • Reclaim a failed request back to the queue.

    The request will be returned for processing later again by another call to RequestQueue.fetch_next_request.


    Parameters

    • request: Request

      The request to return to the queue.

    • optionalkeyword-onlyforefront: bool = False

      Whether to add the request to the head or the end of the queue.

    Returns ProcessedRequest | None