We use cookies (including Google cookies) to personalize ads and analyze traffic. By continuing to use our site, you accept our Privacy Policy.

Query Batching

Number: 2806

Difficulty: Hard

Paid? Yes

Companies: N/A


Problem Description

Design a QueryBatcher class that batches small key queries into larger ones using an asynchronous function queryMultiple. The constructor takes two parameters: the async function queryMultiple(keys) (which returns an array of values corresponding to the queried keys) and a throttle time t in milliseconds. The getValue(key) method should return a promise that resolves with the value for that key. The very first call of a new “batch cycle” should immediately call queryMultiple with that single key, whereas subsequent calls occurring within t ms should be queued and then processed together (in a separate queryMultiple call after t ms). Every key is unique.


Key Insights

  • Use a timer to enforce a throttle window of t milliseconds between queryMultiple calls.
  • The first call in a new cycle is handled immediately—its promise is resolved as soon as queryMultiple returns.
  • Any subsequent getValue calls within the current window are batched. When the timer expires, batch all queued keys into a single queryMultiple call and resolve their pending promises.
  • Maintain a queue (or mapping) pairing each pending key with its promise’s resolve function.

Space and Time Complexity

Time Complexity: O(n) per batch call, where n is the number of keys batched. Space Complexity: O(n) for storing deferred promises and keys in each batch.


Solution

The solution maintains two internal state components:

  1. A timer (or flag) indicating an active throttle window.
  2. A pendingBatch list storing information (key and promise resolution functions) for all getValue calls made during the throttle window (after the immediate call).

When getValue(key) is invoked:

  • If no timer is active (i.e. we are starting a new batch cycle): • Immediately call queryMultiple with [key] and return its promise. • Start a timer for t ms. If subsequent getValue calls occur during this window, they are added to the pendingBatch.
  • If a timer is already running, push the key into the pending batch along with a deferred promise resolver.

When the timer expires:

  • If pendingBatch is non-empty, gather all keys from the batch and call queryMultiple with that list.
  • Distribute the returned results to their respective promises.
  • Clear the pendingBatch and reset the timer so that a new cycle may begin when a later getValue call occurs.

This approach keeps consecutive queryMultiple calls at least t ms apart and guarantees the correct query batching behavior.


Code Solutions

Below are code implementations in Python, JavaScript, C++, and Java. Each snippet is provided with detailed line‐by‐line comments.

import asyncio

class QueryBatcher:
    def __init__(self, queryMultiple, t):
        # Store the provided queryMultiple function and throttle time (in seconds for asyncio)
        self.queryMultiple = queryMultiple
        self.throttle_time = t / 1000.0  # convert milliseconds to seconds
        # List to store pending batch items: each item is (key, future)
        self.pending_batch = []
        # Timer handle for the batch; initially None
        self.batch_timer = None
        # Lock for concurrency handling
        self.lock = asyncio.Lock()
    
    async def getValue(self, key):
        async with self.lock:
            # If no batch timer is active, this is a new cycle.
            if self.batch_timer is None:
                # Immediately call queryMultiple for the current key.
                immediate_future = asyncio.create_task(self.queryMultiple([key]))
                # Start the throttle timer for collecting subsequent keys.
                self.batch_timer = asyncio.get_event_loop().call_later(self.throttle_time, 
                                                                       lambda: asyncio.create_task(self._process_batch()))
                # Release lock before awaiting the immediate result.
                # Return the value when available.
                result = await immediate_future
                # Note: result is a list with one value.
                return result[0]
            else:
                # If a timer is already running, queue the key in the pending batch.
                fut = asyncio.get_event_loop().create_future()
                self.pending_batch.append((key, fut))
                # Return the promise which will be resolved when the batch is processed.
                return await fut

    async def _process_batch(self):
        async with self.lock:
            # Reset timer immediately
            self.batch_timer = None
            if not self.pending_batch:
                # No keys were added during the throttle window.
                return
            # Extract keys and keep track of futures.
            keys = [item[0] for item in self.pending_batch]
            futures = [item[1] for item in self.pending_batch]
            # Clear the pending batch for next cycle.
            self.pending_batch = []
        # Call queryMultiple outside the lock.
        results = await self.queryMultiple(keys)
        # Resolve each future with its corresponding result.
        for fut, res in zip(futures, results):
            if not fut.done():
                fut.set_result(res)
                
# Example usage:
# async def queryMultiple(keys):
#     # Simulate async processing delay if needed.
#     await asyncio.sleep(0.1)
#     return [key + "!" for key in keys]
#
# async def main():
#     batcher = QueryBatcher(queryMultiple, 100)
#     print(await batcher.getValue("a"))  # immediately returns "a!"
#     task_b = asyncio.create_task(batcher.getValue("b"))
#     task_c = asyncio.create_task(batcher.getValue("c"))
#     print(await task_b)                # returns "b!" after throttle time
#     print(await task_c)                # returns "c!" after throttle time
#
# asyncio.run(main())
← Back to All Questions