-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Kafka Transport Bridge #51070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.4
Are you sure you want to change the base?
Conversation
Hey! I see that this is your first PR. That is great! Welcome! Symfony has a contribution guide which I suggest you to read. In short:
Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change. When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor! I am going to sit back now and wait for the reviews. Cheers! Carsonbot |
5527a47
to
ce2f321
Compare
/** @SuppressWarnings(PHPMD.UnusedFormalParameter) */ | ||
public function reject(Envelope $envelope): void | ||
{ | ||
// no reject method for kafka transport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my experience, when the consumer is configured with
enable.auto.offset.store: false
enable.auto.commit: false
the offset is only committed when explicitly doing ack
call. This means in case of an error, the message is never acknowledged / the offset is never committed, hence the consumer will start lagging and won't be able to consume new messages. In some/many cases this may be intended, especially when the ordering of messages matter – but it's also possible that one does not want to block the consumer when there's a single failing message. Then if you have retry configured on the same transport, you start producing retry messages which never get consumed because the consumer is stuck on the initial message (given you only have 1 consumer or 1 consumer group).
I'd like to see a configuration variable to make ack call on reject, a bit similar to Redis transports' delete_after_reject
, say commit_offset_after_reject
or ack_after_reject
.
Anyway, would be nice to see this Kafka transport in the core 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't reviewed the kafka-specific code to validate it given I have no knowledge of Kafka. So my review does not guarantee that this is working code.
|
||
/** | ||
* @psalm-param array{topics: list<string>, consume_timeout_ms: int, commit_async: bool, conf_options: array<string, string>} $consumerConfig | ||
* @psalm-param array{topic: string, poll_timeout_ms: int, flush_timeout_ms: int, conf_options: array<string, string>} $producerConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be @param
. The array shape config is interoperable between tools (at least as long as none of the keys are a non-lowercase variant of a phpdoc keyword as defined by the psalm parser because of vimeo/psalm#10008)
); | ||
} | ||
|
||
/** @psalm-param array<string, bool|float|int|string|array<string>> $configOptions */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be @param
|
||
$producer = $this->getProducer(); | ||
|
||
/** @psalm-var \RdKafka\ProducerTopic $topic */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be @var
(but is this actually needed ? Can't psalm infer it thanks to the return type of newTopic
?)
private LoggerInterface $logger = new NullLogger(), | ||
KafkaFactory $kafkaFactory = null, | ||
) { | ||
if (!$kafkaFactory instanceof KafkaFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you ignoring it when passing a KafkaFactory instead of assigning it ?
To me, the code should be like this:
public function __construct(
private LoggerInterface $logger = new NullLogger(),
KafkaFactory $kafkaFactory = null,
) {
$this->kafkaFactory = $kafkaFactory ?? new KafkaFactory(
new LoggingLogCallback($logger),
new LoggingErrorCallback($logger),
new LoggingRebalanceCallback($logger),
);
}
|
||
private function getReceiver(): KafkaReceiver | ||
{ | ||
return $this->receiver ??= new KafkaReceiver($this->connection, $this->serializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an actual benefit about lazy-loading those 2 objects ? Their constructor only assigns the properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what is the benefit of splitting the logic into those separate objects if they are not available as extension points by allowing to inject them in the constructor, as done in some other bridges ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be inline with the other messenger bridge transports. I can update it to be injected if you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, only the AmazonSQSTransport properly exposes these dependencies through DI, which makes sense as it's the most recent one. I agree we should do the same here
|
||
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; | ||
|
||
final class KafkaMessageStamp implements NonSendableStampInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should those stamps be @internal
or no ?
use RdKafka\KafkaConsumer; | ||
use RdKafka\TopicPartition; | ||
|
||
final class LoggingRebalanceCallback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should those be @internal
or no ?
/** | ||
* @param list<TopicPartition>|null $topicPartitions | ||
*/ | ||
public function __invoke(KafkaConsumer $kafka, ?int $err, array $topicPartitions = null): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why making the second argument nullable and the third argument nullable and optional ? this does not match the documented signature of the rebalance callback where those are always passed.
Making them nullable means that the code inside them has to deal with null
values which this may not be necessary.
public function __invoke(object $kafka, int $level, string $facility, string $message): void | ||
{ | ||
$this->logger->log( | ||
$level, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is broken. you need to convert the kafka level into a \Psr\Log\LogLevel
value. Otherwise, you have no guarantee that things will work fine as the only mandatory supported values for the log
method in PSR-3 are those string constants. Any other supported value for the level is implementation-defined and so you cannot rely on it when targeting only the PSR-3 interface without knowing the implementation.
6.4 | ||
--- | ||
|
||
* Introduced the Kafka bridge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the contributing rules for changelog entries: https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry
* Introduced the Kafka bridge. | |
* Introduce the Kafka bridge. |
91f586a
to
ca7e261
Compare
class KafkaTransport implements TransportInterface | ||
{ | ||
private KafkaReceiver $receiver; | ||
private KafkaSender $sender; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should also use the proper interfaces respectively instead of the concrete implementations, to allow for decoration.
Is there something I can help with to move this forward? we are currently using this package: https://packagist.org/packages/koco/messenger-kafka but it would be nice to have kafka support native in symfony. Thank you |
I agree with AlexOstrovsky. Is there anything we can do to help this PR? It would be really great to have native KAFKA support in Messenger. Thanks! |
@chalasr @andythorne ping to know what we can do to help to move this futher? 👏 Thanks! |
- name: Setup PHP | ||
uses: shivammathur/setup-php@v2 | ||
with: | ||
php-version: ${{ env.php-version }} | ||
extensions: "rdkafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
psalm has stubs for this extension, it should not be required to install it: https://github.com/vimeo/psalm/blob/6.x/stubs/extensions/rdkafka.phpstub
use RdKafka\TopicPartition; | ||
|
||
/** | ||
* @see https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/class.rdkafka-conf.html for more information on callback parameters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This link is not working.
It would be wise to document the methods rather than linking to a page that may change over time.
CHANGELOG | ||
========= | ||
|
||
6.4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated
6.4 | |
7.4 |
Adds support for Kafka as a messenger transport.