Skip to content

[HZ-5408] ReliableTopic for Asyncio#802

Open
yuce wants to merge 13 commits intohazelcast:masterfrom
yuce:asyncio-reliabletopic
Open

[HZ-5408] ReliableTopic for Asyncio#802
yuce wants to merge 13 commits intohazelcast:masterfrom
yuce:asyncio-reliabletopic

Conversation

@yuce
Copy link
Copy Markdown
Contributor

@yuce yuce commented Apr 8, 2026

I had to put create_reliable_topic_proxy in hazelcast/internal/asyncio_proxy/manager.py instead of where the proxy is defined (like other asyncio proxies) in order to resolve a circular import issue.

Asyncore proxy: https://github.com/yuce/hazelcast-python-client/blob/asyncio-reliabletopic/hazelcast/proxy/reliable_topic.py
Asyncio proxy: https://github.com/yuce/hazelcast-python-client/blob/asyncio-reliabletopic/hazelcast/internal/asyncio_proxy/reliable_topic.py

Asyncore test: https://github.com/yuce/hazelcast-python-client/blob/asyncio-reliabletopic/tests/integration/backward_compatible/proxy/reliable_topic_test.py
Asyncio test: https://github.com/yuce/hazelcast-python-client/blob/asyncio-reliabletopic/tests/integration/asyncio/proxy/reliable_topic_test.py

0a1
> import asyncio
17,18c18,20
< from hazelcast.future import ImmediateFuture, Future
< from hazelcast.proxy.base import Proxy, TopicMessage
---
> from hazelcast.internal.asyncio_proxy.base import Proxy
> from hazelcast.proxy.base import TopicMessage
> from hazelcast.proxy.reliable_topic import ReliableMessageListener, _ReliableMessageListenerAdapter
28,29d29
< _RINGBUFFER_PREFIX = "_hz_rb_"
< 
36,160d35
< class ReliableMessageListener(typing.Generic[MessageType]):
<     """A message listener for :class:`ReliableTopic`.
< 
<     A message listener will not be called concurrently (provided that it's
<     not registered twice). So there is no need to synchronize access to
<     the state it reads or writes.
< 
<     If a regular function is registered on a reliable topic, the message
<     listener works fine, but it can't do much more than listen to messages.
< 
<     This is an enhanced version of that to better integrate with the reliable
<     topic.
< 
<     **Durable Subscription**
< 
<     The ReliableMessageListener allows you to control where you want to start
<     processing a message when the listener is registered. This makes it
<     possible to create a durable subscription by storing the sequence of the
<     last message and using this as the sequence id to start from.
< 
<     **Error handling**
< 
<     The ReliableMessageListener also gives the ability to deal with errors
<     using the :func:`is_terminal` method. If a plain function is used, then it
<     won't terminate on errors and it will keep on running. But in some
<     cases it is better to stop running.
< 
<     **Global order**
< 
<     The ReliableMessageListener will always get all events in order (global
<     order). It will not get duplicates and there will only be gaps if it is
<     too slow. For more information see :func:`is_loss_tolerant`.
< 
<     **Delivery guarantees**
< 
<     Because the ReliableMessageListener controls which item it wants to
<     continue from upon restart, it is very easy to provide an at-least-once
<     or at-most-once delivery guarantee. The :func:`store_sequence` is always
<     called before a message is processed; so it can be persisted on some
<     non-volatile storage. When the :func:`retrieve_initial_sequence` returns
<     the stored sequence, then an at-least-once delivery is implemented since
<     the same item is now being processed twice. To implement an at-most-once
<     delivery guarantee, add 1 to the stored sequence when the
<     :func:`retrieve_initial_sequence` is called.
<     """
< 
<     def on_message(self, message: TopicMessage[MessageType]) -> None:
<         """
<         Invoked when a message is received for the added reliable topic.
< 
<         One should not block in this callback. If blocking is necessary,
<         consider delegating that task to an executor or a thread pool.
< 
<         Args:
<             message: The message that is received for the topic
<         """
<         raise NotImplementedError("on_message")
< 
<     def retrieve_initial_sequence(self) -> int:
<         """
<         Retrieves the initial sequence from which this ReliableMessageListener
<         should start.
< 
<         Return ``-1`` if there is no initial sequence and you want to start
<         from the next published message.
< 
<         If you intend to create a durable subscriber so you continue from where
<         you stopped the previous time, load the previous sequence and add ``1``.
<         If you don't add one, then you will be receiving the same message twice.
< 
<         Returns:
<             The initial sequence.
<         """
<         raise NotImplementedError("retrieve_initial_sequence")
< 
<     def store_sequence(self, sequence: int) -> None:
<         """
<         Informs the ReliableMessageListener that it should store the sequence.
<         This method is called before the message is processed. Can be used to
<         make a durable subscription.
< 
<         Args:
<             sequence: The sequence.
<         """
<         raise NotImplementedError("store_sequence")
< 
<     def is_loss_tolerant(self) -> bool:
<         """
<         Checks if this ReliableMessageListener is able to deal with message
<         loss. Even though the reliable topic promises to be reliable, it can
<         be that a ReliableMessageListener is too slow. Eventually the message
<         won't be available anymore.
< 
<         If the ReliableMessageListener is not loss tolerant and the topic
<         detects that there are missing messages, it will terminate the
<         ReliableMessageListener.
< 
<         Returns:
<             ``True`` if the ReliableMessageListener is tolerant towards losing
<             messages.
<         """
<         raise NotImplementedError("is_loss_tolerant")
< 
<     def is_terminal(self, error: Exception) -> bool:
<         """
<         Checks if the ReliableMessageListener should be terminated based on an
<         error raised while calling :func:`on_message`.
< 
<         Args:
<             error: The error raised while calling :func:`on_message`
< 
<         Returns:
<             ``True`` if the ReliableMessageListener should terminate itself,
<             ``False`` if it should keep on running.
<         """
<         raise NotImplementedError("is_terminal")
< 
<     def on_cancel(self) -> None:
<         """
<         Called when the ReliableMessageListener is cancelled. This can happen
<         when the listener is unregistered or cancelled due to an exception or during shutdown.
<         """
<         pass
< 
< 
180a56
>         self._task: asyncio.Task | None = None
182c58
<     def start(self):
---
>     async def start(self):
185c61
<         If the user provided a initial sequence via listener, we will
---
>         If the user provided an initial sequence via listener, we will
188,190d63
< 
<         Returns:
<             hazelcast.future.Future[None]:
194,198c67
<             return ImmediateFuture(None)
< 
<         def continuation(future):
<             sequence = future.result()
<             self._sequence = sequence + 1
---
>             return
202c71,72
<         return self._ringbuffer.tail_sequence().continue_with(continuation)
---
>         sequence = await self._ringbuffer.tail_sequence()
>         self._sequence = sequence + 1
205,206c75,76
<         """Tries to read the next batch from the ringbuffer
<         and call the listener on items when it is done.
---
>         """Schedules an asyncio task to read the next batch from the
>         ringbuffer and call the listener on items when it is done.
210,213c80,83
< 
<         self._ringbuffer.read_many(self._sequence, 1, self._read_batch_size).add_done_callback(
<             self._handle_next_batch
<         )
---
>         # The task is assigned to an instance variable to keep a reference.
>         # That ensures it is not garbage collected before done.
>         # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
>         self._task = asyncio.create_task(self._handle_next_batch())
216c86
<         """Sets the cancelled flag and removes
---
>         """Sets the cancelled flag, cancels the running task, and removes
223,229c93,94
<     def _handle_next_batch(self, future):
<         """Handles the result of the read_many request from
<         the ringbuffer.
< 
<         Args:
<             future (hazelcast.future.Future):
<         """
---
>     async def _handle_next_batch(self):
>         """Reads the next batch from the ringbuffer and processes the items."""
234c99
<             result = future.result()
---
>             result = await self._ringbuffer.read_many(self._sequence, 1, self._read_batch_size)
248d112
< 
276a141,142
>         except asyncio.CancelledError:
>             pass
279c145
<             if not self._handle_internal_error(e):
---
>             if not await self._handle_internal_error(e):
313,321c179,180
<         """Checks if we should terminate the listener
<         based on the error we received while calling the
<         on_message for this message.
< 
<         If the listener says that it should be terminated
<         for this error or it raises some error while
<         we were trying to call is_terminal, the listener
<         will be terminated. Otherwise, a log will be
<         printed and listener will continue.
---
>         """Checks if we should terminate the listener based on the error
>         received while calling on_message.
324c183
<             error: Error we received while calling the listener.
---
>             error: Error received while calling the listener.
359,360c218,219
<     def _handle_internal_error(self, error: Exception) -> bool:
<         """Called when the read_many request is failed.
---
>     async def _handle_internal_error(self, error: Exception) -> bool:
>         """Called when the read_many request fails.
364,367c223,226
<         If we can tolerate the error, we will call next_batch
<         here. The reasoning behind is that, on some cases, we
<         do not immediately call next_batch, but make a request
<         to the server, and based on that, call next_batch.
---
>         If we can tolerate the error, we will call next_batch here.
>         The reasoning behind is that, on some cases, we do not immediately
>         call next_batch, but make a request to the server, and based on
>         that, call next_batch.
374c233
<             When, ``False`` is returned, listener should be cancelled.
---
>             When ``False`` is returned, listener should be cancelled.
383c242
<             return self._handle_illegal_argument_error(error)
---
>             return await self._handle_illegal_argument_error(error)
455c314
<     def _handle_illegal_argument_error(self, error):
---
>     async def _handle_illegal_argument_error(self, error):
459,495c318,350
<             # Listener can tolerate message loss. Try
<             # to continue reading after getting head
<             # sequence, and try to read from there.
<             def on_response(future):
<                 try:
<                     head_sequence = future.result()
<                     _logger.debug(
<                         "MessageListener %s on topic %s requested a too large sequence. "
<                         "Jumping from old sequence %s to sequence %s.",
<                         self._listener,
<                         self._topic_name,
<                         self._sequence,
<                         head_sequence,
<                         exc_info=error,
<                     )
<                     self._sequence = head_sequence
<                     # We call next_batch only after getting the new head
<                     # sequence and updating the our state with it.
<                     self.next_batch()
<                 except Exception as e:
<                     _logger.warning(
<                         "Terminating MessageListener %s on topic %s. "
<                         "Reason: After the ring buffer data related "
<                         "to reliable topic is lost, client tried to get the "
<                         "current head sequence to continue since the listener"
<                         "is loss tolerant, but that request failed.",
<                         self._listener,
<                         self._topic_name,
<                         exc_info=e,
<                     )
<                     # We said that we can handle that error so the listener
<                     # is not cancelled. But, we could not continue since
<                     # our request to the server is failed. We should cancel
<                     # the listener.
<                     self.cancel()
< 
<             self._ringbuffer.head_sequence().add_done_callback(on_response)
---
>             # Listener can tolerate message loss. Try to continue reading
>             # after getting head sequence, and try to read from there.
>             try:
>                 head_sequence = await self._ringbuffer.head_sequence()
>                 _logger.debug(
>                     "MessageListener %s on topic %s requested a too large sequence. "
>                     "Jumping from old sequence %s to sequence %s.",
>                     self._listener,
>                     self._topic_name,
>                     self._sequence,
>                     head_sequence,
>                     exc_info=error,
>                 )
>                 self._sequence = head_sequence
>                 # We call next_batch only after getting the new head
>                 # sequence and updating our state with it.
>                 self.next_batch()
>             except Exception as e:
>                 _logger.warning(
>                     "Terminating MessageListener %s on topic %s. "
>                     "Reason: After the ring buffer data related "
>                     "to reliable topic is lost, client tried to get the "
>                     "current head sequence to continue since the listener "
>                     "is loss tolerant, but that request failed.",
>                     self._listener,
>                     self._topic_name,
>                     exc_info=e,
>                 )
>                 # We said that we can handle that error so the listener
>                 # is not cancelled. But, we could not continue since
>                 # our request to the server failed. We should cancel
>                 # the listener.
>                 self.cancel()
504d358
< 
508,546c362
< class _ReliableMessageListenerAdapter(ReliableMessageListener):
<     """Used when the user provided a function as the listener.
< 
<     That means user does not care about the other properties
<     of the listener. They just want to listen messages. Fill
<     the methods with expected defaults.
<     """
< 
<     def __init__(self, on_message_fn):
<         self._on_message_fn = on_message_fn
< 
<     def on_message(self, message):
<         self._on_message_fn(message)
< 
<     def retrieve_initial_sequence(self):
<         # -1 indicates start from next message.
<         return -1
< 
<     def store_sequence(self, sequence):
<         # no-op
<         pass
< 
<     def is_loss_tolerant(self):
<         # terminate the listener on message loss
<         return False
< 
<     def is_terminal(self, error):
<         # do not terminate the listener or errors
<         return False
< 
< 
< def _no_op_continuation(future):
<     # Used when we just care about whether
<     # the ringbuffer request is failed. We just
<     # check the result and return nothing.
<     future.result()
< 
< 
< class ReliableTopic(Proxy["BlockingReliableTopic"], typing.Generic[MessageType]):
---
> class ReliableTopic(Proxy, typing.Generic[MessageType]):
563c379
<     def __init__(self, service_name, name, context):
---
>     def __init__(self, service_name, name, context, ringbuffer):
571,573c387,388
<         self._context = context
<         self._ringbuffer = context.client.get_ringbuffer(_RINGBUFFER_PREFIX + name)
<         self._runners = {}
---
>         self._ringbuffer = ringbuffer
>         self._runners: typing.Dict[str, _MessageRunner] = {}
575c390
<     def publish(self, message: MessageType) -> Future[None]:
---
>     async def publish(self, message: MessageType) -> None:
585c400
<             return self._send_schema_and_retry(e, self.publish, message)
---
>             return await self._send_schema_and_retry(e, self.publish, message)
591c406
<             return self._add_with_backoff(topic_message)
---
>             return await self._add_with_backoff(topic_message)
593c408
<             return self._add_or_fail(topic_message)
---
>             return await self._add_or_fail(topic_message)
595c410
<             return self._add_or_overwrite(topic_message)
---
>             return await self._add_or_overwrite(topic_message)
597c412
<             return self._add_or_discard(topic_message)
---
>             return await self._add_or_discard(topic_message)
601c416
<     def publish_all(self, messages: typing.Sequence[MessageType]) -> Future[None]:
---
>     async def publish_all(self, messages: typing.Sequence[MessageType]) -> None:
615c430
<             return self._send_schema_and_retry(e, self.publish_all, messages)
---
>             return await self._send_schema_and_retry(e, self.publish_all, messages)
619c434
<             return self._add_messages_with_backoff(topic_messages)
---
>             return await self._add_messages_with_backoff(topic_messages)
621c436
<             return self._add_messages_or_fail(topic_messages)
---
>             return await self._add_messages_or_fail(topic_messages)
623c438
<             return self._add_messages_or_overwrite(topic_messages)
---
>             return await self._add_messages_or_overwrite(topic_messages)
625c440
<             return self._add_messages_or_discard(topic_messages)
---
>             return await self._add_messages_or_discard(topic_messages)
629c444
<     def add_listener(
---
>     async def add_listener(
634c449
<     ) -> Future[str]:
---
>     ) -> str:
642c457
<         When a message is published, the,
---
>         When a message is published, the
655d469
< 
658d471
< 
667a481,487
>         await runner.start()
>         # If the runner started successfully, register it.
>         self._runners[registration_id] = runner
>         runner.next_batch()
>         # ensure the runner is scheduled
>         await asyncio.sleep(0)
>         return registration_id
669,678c489
<         def continuation(future):
<             future.result()
<             # If the runner started successfully, register it.
<             self._runners[registration_id] = runner
<             runner.next_batch()
<             return registration_id
< 
<         return runner.start().continue_with(continuation)
< 
<     def remove_listener(self, registration_id: str) -> Future[bool]:
---
>     async def remove_listener(self, registration_id: str) -> bool:
692c503
<             return ImmediateFuture(False)
---
>             return False
695c506
<         return ImmediateFuture(True)
---
>         return True
697c508
<     def destroy(self) -> bool:
---
>     async def destroy(self) -> bool:
699d509
< 
703a514,515
>         await super(ReliableTopic, self).destroy()
>         return await self._ringbuffer.destroy()
705,776c517,522
<         super(ReliableTopic, self).destroy()
<         return self._ringbuffer.destroy()
< 
<     def blocking(self) -> "BlockingReliableTopic[MessageType]":
<         return BlockingReliableTopic(self)
< 
<     def _add_or_fail(self, message):
<         def continuation(future):
<             sequence_id = future.result()
<             if sequence_id == -1:
<                 raise TopicOverloadError(
<                     "Failed to publish message %s on topic %s." % (message, self.name)
<                 )
< 
<         return self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).continue_with(continuation)
< 
<     def _add_messages_or_fail(self, messages):
<         def continuation(future):
<             sequence_id = future.result()
<             if sequence_id == -1:
<                 raise TopicOverloadError("Failed to publish messages on topic %s." % self.name)
< 
<         return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).continue_with(continuation)
< 
<     def _add_or_overwrite(self, message):
<         return self._ringbuffer.add(message, OVERFLOW_POLICY_OVERWRITE).continue_with(
<             _no_op_continuation
<         )
< 
<     def _add_messages_or_overwrite(self, messages):
<         return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_OVERWRITE).continue_with(
<             _no_op_continuation
<         )
< 
<     def _add_or_discard(self, message):
<         return self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).continue_with(
<             _no_op_continuation
<         )
< 
<     def _add_messages_or_discard(self, messages):
<         return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).continue_with(
<             _no_op_continuation
<         )
< 
<     def _add_with_backoff(self, message):
<         future = Future()
<         self._try_adding_with_backoff(message, _INITIAL_BACKOFF, future)
<         return future
< 
<     def _add_messages_with_backoff(self, messages):
<         future = Future()
<         self._try_adding_messages_with_backoff(messages, _INITIAL_BACKOFF, future)
<         return future
< 
<     def _try_adding_with_backoff(self, message, backoff, future):
<         def callback(add_future):
<             try:
<                 sequence_id = add_future.result()
<                 if sequence_id != -1:
<                     future.set_result(None)
<                     return
< 
<                 self._context.reactor.add_timer(
<                     backoff,
<                     lambda: self._try_adding_with_backoff(
<                         message, min(_MAX_BACKOFF, 2 * backoff), future
<                     ),
<                 )
<             except Exception as e:
<                 future.set_result(e)
< 
<         self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).add_done_callback(callback)
---
>     async def _add_or_fail(self, message):
>         sequence_id = await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL)
>         if sequence_id == -1:
>             raise TopicOverloadError(
>                 "Failed to publish message %s on topic %s." % (message, self.name)
>             )
778,793c524,548
<     def _try_adding_messages_with_backoff(self, messages, backoff, future):
<         def callback(add_future):
<             try:
<                 sequence_id = add_future.result()
<                 if sequence_id != -1:
<                     future.set_result(None)
<                     return
< 
<                 self._context.reactor.add_timer(
<                     backoff,
<                     lambda: self._try_adding_messages_with_backoff(
<                         messages, min(_MAX_BACKOFF, 2 * backoff), future
<                     ),
<                 )
<             except Exception as e:
<                 future.set_result(e)
---
>     async def _add_messages_or_fail(self, messages):
>         sequence_id = await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL)
>         if sequence_id == -1:
>             raise TopicOverloadError("Failed to publish messages on topic %s." % self.name)
> 
>     async def _add_or_overwrite(self, message):
>         await self._ringbuffer.add(message, OVERFLOW_POLICY_OVERWRITE)
> 
>     async def _add_messages_or_overwrite(self, messages):
>         await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_OVERWRITE)
> 
>     async def _add_or_discard(self, message):
>         await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL)
> 
>     async def _add_messages_or_discard(self, messages):
>         await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL)
> 
>     async def _add_with_backoff(self, message):
>         backoff = _INITIAL_BACKOFF
>         while True:
>             sequence_id = await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL)
>             if sequence_id != -1:
>                 return
>             await asyncio.sleep(backoff)
>             backoff = min(_MAX_BACKOFF, 2 * backoff)
795c550,557
<         self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).add_done_callback(callback)
---
>     async def _add_messages_with_backoff(self, messages):
>         backoff = _INITIAL_BACKOFF
>         while True:
>             sequence_id = await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL)
>             if sequence_id != -1:
>                 return
>             await asyncio.sleep(backoff)
>             backoff = min(_MAX_BACKOFF, 2 * backoff)
806,850d567
< 
< 
< class BlockingReliableTopic(ReliableTopic[MessageType]):
<     __slots__ = ("_wrapped", "name", "service_name")
< 
<     def __init__(self, wrapped: ReliableTopic[MessageType]):
<         self.name = wrapped.name
<         self.service_name = wrapped.service_name
<         self._wrapped = wrapped
< 
<     def publish(  # type: ignore[override]
<         self,
<         message: MessageType,
<     ) -> None:
<         return self._wrapped.publish(message).result()
< 
<     def publish_all(  # type: ignore[override]
<         self,
<         messages: typing.Sequence[MessageType],
<     ) -> None:
<         return self._wrapped.publish_all(messages).result()
< 
<     def add_listener(  # type: ignore[override]
<         self,
<         listener: typing.Union[
<             ReliableMessageListener,
<             typing.Callable[[TopicMessage[MessageType]], None],
<         ],
<     ) -> str:
<         return self._wrapped.add_listener(listener).result()
< 
<     def remove_listener(  # type: ignore[override]
<         self,
<         registration_id: str,
<     ) -> bool:
<         return self._wrapped.remove_listener(registration_id).result()
< 
<     def destroy(self) -> bool:
<         return self._wrapped.destroy()
< 
<     def blocking(self) -> "BlockingReliableTopic[MessageType]":
<         return self
< 
<     def __repr__(self) -> str:
<         return self._wrapped.__repr__()

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 8, 2026

Codecov Report

❌ Patch coverage is 82.75862% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.33%. Comparing base (8d19a35) to head (5ef864a).

Files with missing lines Patch % Lines
hazelcast/internal/asyncio_proxy/reliable_topic.py 81.85% 45 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #802      +/-   ##
==========================================
- Coverage   94.41%   94.33%   -0.09%     
==========================================
  Files         399      400       +1     
  Lines       26016    26274     +258     
==========================================
+ Hits        24563    24785     +222     
- Misses       1453     1489      +36     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@yuce yuce changed the title ReliableTopic for Asyncio [HZ-5408] ReliableTopic for Asyncio Apr 8, 2026
yuce added 4 commits April 20, 2026 06:54
# Conflicts:
#	hazelcast/internal/asyncio_client.py
#	hazelcast/internal/asyncio_proxy/manager.py
@yuce yuce requested a review from emreyigit April 20, 2026 07:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants