Skip to content

[Messenger] Kafka Transport Bridge #51070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: 7.4
Choose a base branch
from

Conversation

andythorne
Copy link

Q A
Branch? 6.4
Bug fix? no
New feature? yes
Deprecations? no
Tickets ~
License MIT
Doc PR symfony/symfony-docs (working on atm)

Adds support for Kafka as a messenger transport.

I know there's another PR open for a kafka messenger implementation, but it appears to be abandoned. This PR is based on https://github.com/symfony-examples/messenger-kafka, which I believe to be a better implementation.

@carsonbot
Copy link

Hey!

I see that this is your first PR. That is great! Welcome!

Symfony has a contribution guide which I suggest you to read.

In short:

  • Always add tests
  • Keep backward compatibility (see https://symfony.com/bc).
  • Bug fixes must be submitted against the lowest maintained branch where they apply (see https://symfony.com/releases)
  • Features and deprecations must be submitted against the 6.4 branch.

Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change.

When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor!
If this PR is merged in a lower version branch, it will be merged up to all maintained branches within a few days.

I am going to sit back now and wait for the reviews.

Cheers!

Carsonbot

/** @SuppressWarnings(PHPMD.UnusedFormalParameter) */
public function reject(Envelope $envelope): void
{
// no reject method for kafka transport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experience, when the consumer is configured with

enable.auto.offset.store: false
enable.auto.commit: false

the offset is only committed when explicitly doing ack call. This means in case of an error, the message is never acknowledged / the offset is never committed, hence the consumer will start lagging and won't be able to consume new messages. In some/many cases this may be intended, especially when the ordering of messages matter – but it's also possible that one does not want to block the consumer when there's a single failing message. Then if you have retry configured on the same transport, you start producing retry messages which never get consumed because the consumer is stuck on the initial message (given you only have 1 consumer or 1 consumer group).

I'd like to see a configuration variable to make ack call on reject, a bit similar to Redis transports' delete_after_reject, say commit_offset_after_reject or ack_after_reject.


Anyway, would be nice to see this Kafka transport in the core 👍

Copy link
Member

@stof stof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed the kafka-specific code to validate it given I have no knowledge of Kafka. So my review does not guarantee that this is working code.


/**
* @psalm-param array{topics: list<string>, consume_timeout_ms: int, commit_async: bool, conf_options: array<string, string>} $consumerConfig
* @psalm-param array{topic: string, poll_timeout_ms: int, flush_timeout_ms: int, conf_options: array<string, string>} $producerConfig
Copy link
Member

@stof stof Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be @param. The array shape config is interoperable between tools (at least as long as none of the keys are a non-lowercase variant of a phpdoc keyword as defined by the psalm parser because of vimeo/psalm#10008)

);
}

/** @psalm-param array<string, bool|float|int|string|array<string>> $configOptions */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be @param


$producer = $this->getProducer();

/** @psalm-var \RdKafka\ProducerTopic $topic */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be @var (but is this actually needed ? Can't psalm infer it thanks to the return type of newTopic ?)

private LoggerInterface $logger = new NullLogger(),
KafkaFactory $kafkaFactory = null,
) {
if (!$kafkaFactory instanceof KafkaFactory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you ignoring it when passing a KafkaFactory instead of assigning it ?

To me, the code should be like this:

    public function __construct(
        private LoggerInterface $logger = new NullLogger(),
        KafkaFactory $kafkaFactory = null,
    ) {
            $this->kafkaFactory = $kafkaFactory ?? new KafkaFactory(
                new LoggingLogCallback($logger),
                new LoggingErrorCallback($logger),
                new LoggingRebalanceCallback($logger),
            );
    }


private function getReceiver(): KafkaReceiver
{
return $this->receiver ??= new KafkaReceiver($this->connection, $this->serializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an actual benefit about lazy-loading those 2 objects ? Their constructor only assigns the properties

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And what is the benefit of splitting the logic into those separate objects if they are not available as extension points by allowing to inject them in the constructor, as done in some other bridges ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be inline with the other messenger bridge transports. I can update it to be injected if you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, only the AmazonSQSTransport properly exposes these dependencies through DI, which makes sense as it's the most recent one. I agree we should do the same here


use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

final class KafkaMessageStamp implements NonSendableStampInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should those stamps be @internal or no ?

use RdKafka\KafkaConsumer;
use RdKafka\TopicPartition;

final class LoggingRebalanceCallback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should those be @internal or no ?

/**
* @param list<TopicPartition>|null $topicPartitions
*/
public function __invoke(KafkaConsumer $kafka, ?int $err, array $topicPartitions = null): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why making the second argument nullable and the third argument nullable and optional ? this does not match the documented signature of the rebalance callback where those are always passed.

Making them nullable means that the code inside them has to deal with null values which this may not be necessary.

public function __invoke(object $kafka, int $level, string $facility, string $message): void
{
$this->logger->log(
$level,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is broken. you need to convert the kafka level into a \Psr\Log\LogLevel value. Otherwise, you have no guarantee that things will work fine as the only mandatory supported values for the log method in PSR-3 are those string constants. Any other supported value for the level is implementation-defined and so you cannot rely on it when targeting only the PSR-3 interface without knowing the implementation.

6.4
---

* Introduced the Kafka bridge.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the contributing rules for changelog entries: https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry

Suggested change
* Introduced the Kafka bridge.
* Introduce the Kafka bridge.

@nicolas-grekas nicolas-grekas modified the milestones: 6.4, 7.1 Nov 15, 2023
class KafkaTransport implements TransportInterface
{
private KafkaReceiver $receiver;
private KafkaSender $sender;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should also use the proper interfaces respectively instead of the concrete implementations, to allow for decoration.

@AlexOstrovsky
Copy link

Is there something I can help with to move this forward? we are currently using this package: https://packagist.org/packages/koco/messenger-kafka but it would be nice to have kafka support native in symfony.

Thank you

@Dodenis
Copy link

Dodenis commented Feb 28, 2025

I agree with AlexOstrovsky. Is there anything we can do to help this PR? It would be really great to have native KAFKA support in Messenger.

Thanks!

@AlexOstrovsky
Copy link

@chalasr @andythorne ping to know what we can do to help to move this futher? 👏

Thanks!

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ env.php-version }}
extensions: "rdkafka"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

psalm has stubs for this extension, it should not be required to install it: https://github.com/vimeo/psalm/blob/6.x/stubs/extensions/rdkafka.phpstub

use RdKafka\TopicPartition;

/**
* @see https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/class.rdkafka-conf.html for more information on callback parameters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This link is not working.

It would be wise to document the methods rather than linking to a page that may change over time.

CHANGELOG
=========

6.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated

Suggested change
6.4
7.4

@fabpot fabpot modified the milestones: 7.3, 7.4 May 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.