Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Last two messages are not written to the Topic #994

@ijbo

Description

@ijbo

Below is my code :
Problem : Producer not writing the last two messages to the Topic.

`import os
import asyncio
import websockets
from pykafka import KafkaClient
from websockets.extensions import permessage_deflate


class Server:
    client = None
    kafka_client = None
    topic_name = "tpa_19"
    topic = None
    producer = None

    def get_port(self):
        return os.getenv('WS_PORT', '10015')

    def connect_kafka_client(self):
        client = KafkaClient(hosts="localhost:9092", use_greenlets=True)
        self.client = client
        self.set_topic()
        print("Connection Done with Kafka")

    def set_topic(self):
        self.topic = self.client.topics[self.topic_name]
        self.producer = self.topic.get_producer(min_queued_messages=1,max_queued_messages=0,
                                                linger_ms=500)
 
    def get_host(self):
        return os.getenv('WS_HOST', 'localhost')

    def start(self):
        return websockets.serve(self.handler, self.get_host(), self.get_port(), ping_interval=None, max_size=None,
                                max_queue=None,close_timeout=None,extensions=[
        permessage_deflate.ServerPerMessageDeflateFactory(
            server_max_window_bits=11,
            client_max_window_bits=11,
            compress_settings={'memLevel': 4},
        ),
    ])

    async def send_message_to_kafka(self, producer, row):
        try:
            # print(row)
            producer.produce(row.encode())
        except Exception as ex:
            print(ex)

    async def handler(self, websocket, path):
        async for row in websocket:
             await self.send_message_to_kafka(self.producer, row)
 

if __name__ == '__main__':
    ws = Server()
    ws.connect_kafka_client()
    asyncio.get_event_loop().run_until_complete(ws.start())
    asyncio.get_event_loop().run_forever()
`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions