Skip to main content

RedisRequestQueueClient

Redis implementation of the request queue client.

This client persists requests to Redis using multiple data structures for efficient queue operations, deduplication, and concurrent access safety. Requests are stored with FIFO ordering and support both regular and forefront (high-priority) insertion modes.

The implementation uses Bloom filters for efficient request deduplication and Redis lists for queue operations. Request blocking and client coordination is handled through Redis hashes with timestamp-based expiration for stale request recovery.

The request queue data is stored in Redis using the following key patterns:

  • request_queues:{name}:queue - Redis list for FIFO request ordering
  • request_queues:{name}:data - Redis hash storing serialized Request objects by unique_key
  • request_queues:{name}:in_progress - Redis hash tracking requests currently being processed
  • request_queues:{name}:added_bloom_filter - Bloom filter for added request deduplication (bloom dedup_strategy)
  • request_queues:{name}:handled_bloom_filter - Bloom filter for completed request tracking (bloom dedup_strategy)
  • request_queues:{name}:pending_set - Redis set for added request deduplication (default dedup_strategy)
  • request_queues:{name}:handled_set - Redis set for completed request tracking (default dedup_strategy)
  • request_queues:{name}:metadata - Redis JSON object containing queue metadata

Requests are serialized to JSON for storage and maintain proper FIFO ordering through Redis list operations. The implementation provides concurrent access safety through atomic Lua scripts, Bloom filter operations, and Redis's built-in atomicity guarantees for individual operations.

Hierarchy

Index

Methods

__init__

  • __init__(storage_name, storage_id, redis, dedup_strategy, bloom_error_rate): None
  • Initialize a new instance.

    Preferably use the RedisRequestQueueClient.open class method to create a new instance.


    Parameters

    • storage_name: str
    • storage_id: str
    • redis: Redis
    • optionaldedup_strategy: Literal[default, bloom] = 'default'
    • optionalbloom_error_rate: float = 1e-7

    Returns None

add_batch_of_requests

  • Add batch of requests to the queue.

    This method adds a batch of requests to the queue. Each request is processed based on its uniqueness (determined by unique_key). Duplicates will be identified but not re-added to the queue.


    Parameters

    • requests: Sequence[Request]

      The collection of requests to add to the queue.

    • optionalkeyword-onlyforefront: bool = False

      Whether to put the added requests at the beginning (True) or the end (False) of the queue. When True, the requests will be processed sooner than previously added requests.

    Returns AddRequestsResponse

drop

  • async drop(): None
  • Drop the whole request queue and remove all its values.

    The backend method for the RequestQueue.drop call.


    Returns None

fetch_next_request

  • async fetch_next_request(): Request | None
  • Return the next request in the queue to be processed.

    Once you successfully finish processing of the request, you need to call RequestQueue.mark_request_as_handled to mark the request as handled in the queue. If there was some error in processing the request, call RequestQueue.reclaim_request instead, so that the queue will give the request to some other consumer in another call to the fetch_next_request method.

    Note that the None return value does not mean the queue processing finished, it means there are currently no pending requests. To check whether all requests in queue were finished, use RequestQueue.is_finished instead.


    Returns Request | None

get_metadata

get_request

  • async get_request(unique_key): Request | None

is_empty

  • async is_empty(): bool

mark_request_as_handled

open

  • Open or create a new Redis request queue client.

    This method attempts to open an existing request queue from the Redis database. If a queue with the specified ID or name exists, it loads the metadata from the database. If no existing queue is found, a new one is created.


    Parameters

    • keyword-onlyid: str | None

      The ID of the request queue. If not provided, a random ID will be generated.

    • keyword-onlyname: str | None

      The name of the dataset for named (global scope) storages.

    • keyword-onlyalias: str | None

      The alias of the dataset for unnamed (run scope) storages.

    • keyword-onlyredis: Redis

      Redis client instance.

    • optionalkeyword-onlydedup_strategy: Literal[default, bloom] = 'default'

      Strategy for request queue deduplication. Options are:

      • 'default': Uses Redis sets for exact deduplication.
      • 'bloom': Uses Redis Bloom filters for probabilistic deduplication with lower memory usage. When using this approach, there is a possibility 1e-7 that requests will be skipped in the queue.
    • optionalkeyword-onlybloom_error_rate: float = 1e-7

      Desired false positive rate for Bloom filter deduplication. Only relevant if dedup_strategy is set to 'bloom'.

    Returns RedisRequestQueueClient

purge

  • async purge(): None
  • Purge all items from the request queue.

    The backend method for the RequestQueue.purge call.


    Returns None

reclaim_request

  • Reclaim a failed request back to the queue.

    The request will be returned for processing later again by another call to RequestQueue.fetch_next_request.


    Parameters

    • request: Request

      The request to return to the queue.

    • optionalkeyword-onlyforefront: bool = False

      Whether to add the request to the head or the end of the queue.

    Returns ProcessedRequest | None

Properties

metadata_key

metadata_key: str

Return the Redis key for the metadata of this storage.

redis

redis: Redis

Return the Redis client instance.