New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Messenger] Kafka Transport Bridge #39712
base: 6.2
Are you sure you want to change the base?
[Messenger] Kafka Transport Bridge #39712
Conversation
c5d506c
to
38ae438
Compare
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
@derrabus thank you for your review! I'll get started with the fixes tomorrow. Actual integration tests are possible. I have a test that writes and reads. I see there is a kafka instance running in travis, so that's cool! Do you think we could install the current rdkafka on travis? |
Our integration tests run on GitHub Actions. That extension should already be enabled there. |
Oh nice. What about travis though? This test run (Unit Tests) fails because of the missing rdkafka. |
There's a block |
846e72b
to
97ce904
Compare
@@ -0,0 +1,10 @@ | |||
<?php | |||
|
|||
declare(strict_types=1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use strict type in symfony
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirm, this should be removed
public function testSupports() | ||
{ | ||
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', [])); | ||
static::assertTrue($this->factory->supports('kafka+ssl://my-staging-kafka:9093', [])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use querystring parameter sslmode=disable
for consistency with sqs adapter and pgsql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TLS config is done through settings passed in the conf
array, and conforms to the underlying librdkafka format. The protocol in the DSN is only used to recognize this DSN as a Kafka-DSN.
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaMessageStamp.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php
Outdated
Show resolved
Hide resolved
$conf = new KafkaConf(); | ||
|
||
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be a \Closure
couldn't it be a callable? $conf->setRebalanceCb([$this, 'rebalanceCb']);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik it can be a callable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it :)
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); | ||
|
||
$brokers = $this->stripProtocol($dsn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dns is not only about hosts, other bdriges uses queryString to provide parameters.
ie: kafka://host:port?flushTimeout=5000
private function stripProtocol(string $dsn): array | ||
{ | ||
$brokers = []; | ||
foreach (explode(',', $dsn) as $currentBroker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds fragile to me. What is somebody send kafka://host?param=option1,option2
To provide multiple hosts I would be consistent with redis DSN in cache component
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSender.php
Outdated
Show resolved
Hide resolved
9aea4b6
to
f00af8a
Compare
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportTest.php
Outdated
Show resolved
Hide resolved
'key' => $message->key, | ||
'partition' => $message->partition, | ||
'offset' => $message->offset, | ||
'timestamp' => $message->timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...'); | ||
break; | ||
case \RD_KAFKA_RESP_ERR__TIMED_OUT: | ||
$this->logger->debug('Kafka: Consumer timeout.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a lot of discussion in phprdkafka about this. It's an internal message coming from librdkafka which has a lot of different possible meanings - some are "normal", like described above (especially if partition EOF reporting is disabled via config, which it ideally shouldn't be - so it generates EOF, not this). Others might indicate a problem.
librdkafka.h described this error as:
RD_KAFKA_RESP_ERR__TIMED_OUT | Operation timed out |
---|
$this->logger->debug('Kafka: Consumer timeout.'); | ||
break; | ||
case \RD_KAFKA_RESP_ERR__TRANSPORT: | ||
$this->logger->debug('Kafka: Broker transport failure.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above unfortunately - usually consumer can recover from this if connection can be reestablished. In this case I'd move this up to an error
level, because if this happens then something wrong is happening with the broker / communication, but as said - consumer can recover from this.
$message = $transportStamp->getMessage(); | ||
|
||
if ($this->properties->isCommitAsync()) { | ||
$consumer->commitAsync($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using commit
(synchronous) usually drops the throughput of the implementation (afaik). From my experience people that use phprdkafka usually switch to async commits strictly because the difference in speed it has.
@nick-zh can you support / deny me on this?
$payload['body'], | ||
$payload['key'] ?? null, | ||
$payload['headers'] ?? null, | ||
$payload['timestamp_ms'] ?? null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's allowed by phprdkafka:
timestamp_ms
Timestamp that should be set for the message, if not set, the broker will set it.
In this case, it should be omitted or read from a stamp.
$conf = new KafkaConf(); | ||
|
||
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik it can be a callable
.
$payload['body'], | ||
$payload['key'] ?? null, | ||
$payload['headers'] ?? null, | ||
$payload['timestamp_ms'] ?? null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, $payload is the result of $this->serializer->encode($envelope)
.
The serializers provided by symfony only returns body
and headers
.
The fields key
and timestamp_ms
used here does not exists.
If you want extra data, you should use a custom Stamp
$this->conf->setRebalanceCb($this->createRebalanceCb($this->logger)); | ||
} | ||
|
||
public function get(): iterable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract of this method is defined by the Interface
'body' => $message->payload, | ||
'headers' => $message->headers, | ||
'key' => $message->key, | ||
'topic_name' => $message->topic_name, | ||
'partition' => $message->partition, | ||
'offset' => $message->offset, | ||
'timestamp' => $message->timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of serializers will only take care of headers
and body
3f019ca
to
d04c435
Compare
Is it something we want to include for 6.1? |
@fabpot I'll rebase the branch and fix the tests :) |
@KonstantinCodes What's the problem that you don't continue here? Do you need help or something? I would be very happy if continued here <3 |
@torbenbr hey, thank you for asking! Actually, the build system was puzzling me. Maybe we can take a look at that together? |
@KonstantinCodes I'll look at it tonight (Timezone UTC+2) |
This PR integrates a simple Kafka Transport for Messenger.