summaryrefslogtreecommitdiff
path: root/server/continuedev/libs/util/queue.py
blob: e1f98cc64aa37e3dccd88070224ab76faaaa67d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
from typing import Dict


class AsyncSubscriptionQueue:
    # The correct way to do this is probably to keep request IDs
    queues: Dict[str, asyncio.Queue] = {}

    def post(self, messageType: str, data: any):
        if messageType not in self.queues:
            self.queues.update({messageType: asyncio.Queue()})
        self.queues[messageType].put_nowait(data)

    async def get(self, message_type: str) -> any:
        if message_type not in self.queues:
            self.queues.update({message_type: asyncio.Queue()})
        return await self.queues[message_type].get()