Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Kafka Transport Bridge #39712

Open
wants to merge 16 commits into
base: 6.2
Choose a base branch
from

Conversation

KonstantinCodes
Copy link
Contributor

@KonstantinCodes KonstantinCodes commented Jan 4, 2021

Q A
Branch? 5.4
Bug fix? no
New feature? yes
Deprecations? no
Tickets Fix #35549
License MIT
Doc PR symfony/symfony-docs#15884

This PR integrates a simple Kafka Transport for Messenger.

@KonstantinCodes KonstantinCodes requested a review from sroze as a code owner Jan 4, 2021
@KonstantinCodes KonstantinCodes changed the title Kafka Transport Bridge [Messenger] Kafka Transport Bridge Jan 4, 2021
@nicolas-grekas nicolas-grekas added this to the 5.x milestone Jan 4, 2021
Copy link
Member

@derrabus derrabus left a comment

Thank you for your contribution.

I wonder how difficult it might be to test against an actual Kafka instance in our integration test suite. What do you think?

@KonstantinCodes
Copy link
Contributor Author

KonstantinCodes commented Jan 4, 2021

@derrabus thank you for your review! I'll get started with the fixes tomorrow.

Actual integration tests are possible. I have a test that writes and reads. I see there is a kafka instance running in travis, so that's cool!

Do you think we could install the current rdkafka on travis?

@derrabus
Copy link
Member

derrabus commented Jan 4, 2021

Do you think we could install the current rdkafka on travis?

Our integration tests run on GitHub Actions. That extension should already be enabled there.

@KonstantinCodes
Copy link
Contributor Author

KonstantinCodes commented Jan 4, 2021

Oh nice. What about travis though? This test run (Unit Tests) fails because of the missing rdkafka.

@derrabus
Copy link
Member

derrabus commented Jan 4, 2021

There's a block Install extra PHP extensions in .travis.yaml. You can add the extension there.

@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 3 times, most recently from 846e72b to 97ce904 Compare Jan 5, 2021
Copy link
Member

@jderusse jderusse left a comment

Thank for working on this.

Few comments from my side. I think you should remove some logs, and reduce the verbosity.

@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);
Copy link
Member

@jderusse jderusse Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use strict type in symfony

Copy link
Member

@nicolas-grekas nicolas-grekas Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm, this should be removed

public function testSupports()
{
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', []));
static::assertTrue($this->factory->supports('kafka+ssl://my-staging-kafka:9093', []));
Copy link
Member

@jderusse jderusse Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use querystring parameter sslmode=disable for consistency with sqs adapter and pgsql

Copy link
Contributor Author

@KonstantinCodes KonstantinCodes May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TLS config is done through settings passed in the conf array, and conforms to the underlying librdkafka format. The protocol in the DSN is only used to recognize this DSN as a Kafka-DSN.

$conf = new KafkaConf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));
Copy link
Member

@jderusse jderusse Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be a \Closure couldn't it be a callable? $conf->setRebalanceCb([$this, 'rebalanceCb']);

Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik it can be a callable.

Copy link
Contributor Author

@KonstantinCodes KonstantinCodes Jan 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it :)

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));

$brokers = $this->stripProtocol($dsn);
Copy link
Member

@jderusse jderusse Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dns is not only about hosts, other bdriges uses queryString to provide parameters.
ie: kafka://host:port?flushTimeout=5000

private function stripProtocol(string $dsn): array
{
$brokers = [];
foreach (explode(',', $dsn) as $currentBroker) {
Copy link
Member

@jderusse jderusse Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds fragile to me. What is somebody send kafka://host?param=option1,option2

To provide multiple hosts I would be consistent with redis DSN in cache component

src/Symfony/Component/Messenger/Bridge/Kafka/composer.json Outdated Show resolved Hide resolved
@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 4 times, most recently from 9aea4b6 to f00af8a Compare Jan 6, 2021
'key' => $message->key,
'partition' => $message->partition,
'offset' => $message->offset,
'timestamp' => $message->timestamp,
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jderusse I second the opinion that those keys are valuable and often used, especially when cross-language communication between applications is needed. As you can see values passed to serializer come from Kafka Message object, which would be otherwise unavailable to end user.

Ping @nick-zh.

$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...');
break;
case \RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->logger->debug('Kafka: Consumer timeout.');
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a lot of discussion in phprdkafka about this. It's an internal message coming from librdkafka which has a lot of different possible meanings - some are "normal", like described above (especially if partition EOF reporting is disabled via config, which it ideally shouldn't be - so it generates EOF, not this). Others might indicate a problem.

librdkafka.h described this error as:

https://docs.confluent.io/2.0.0/clients/librdkafka/rdkafka_8h.html#a03509bab51072c72a8dcf52337e6d5cba191a3d68aab046a25af5e861a5ce394e

RD_KAFKA_RESP_ERR__TIMED_OUT Operation timed out

$this->logger->debug('Kafka: Consumer timeout.');
break;
case \RD_KAFKA_RESP_ERR__TRANSPORT:
$this->logger->debug('Kafka: Broker transport failure.');
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above unfortunately - usually consumer can recover from this if connection can be reestablished. In this case I'd move this up to an error level, because if this happens then something wrong is happening with the broker / communication, but as said - consumer can recover from this.

$message = $transportStamp->getMessage();

if ($this->properties->isCommitAsync()) {
$consumer->commitAsync($message);
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using commit (synchronous) usually drops the throughput of the implementation (afaik). From my experience people that use phprdkafka usually switch to async commits strictly because the difference in speed it has.

@nick-zh can you support / deny me on this?

$payload['body'],
$payload['key'] ?? null,
$payload['headers'] ?? null,
$payload['timestamp_ms'] ?? null
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's allowed by phprdkafka:

timestamp_ms
Timestamp that should be set for the message, if not set, the broker will set it.

In this case, it should be omitted or read from a stamp.

$conf = new KafkaConf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));
Copy link
Contributor

@Steveb-p Steveb-p Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik it can be a callable.

Copy link
Member

@jderusse jderusse left a comment

I do think the stamp is needed to convoy extra informations like key, partition, timestamp_ms, ...

$payload['body'],
$payload['key'] ?? null,
$payload['headers'] ?? null,
$payload['timestamp_ms'] ?? null
Copy link
Member

@jderusse jderusse Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, $payload is the result of $this->serializer->encode($envelope).
The serializers provided by symfony only returns body and headers.

The fields key and timestamp_ms used here does not exists.
If you want extra data, you should use a custom Stamp

$this->conf->setRebalanceCb($this->createRebalanceCb($this->logger));
}

public function get(): iterable
Copy link
Member

@jderusse jderusse Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract of this method is defined by the Interface

'body' => $message->payload,
'headers' => $message->headers,
'key' => $message->key,
'topic_name' => $message->topic_name,
'partition' => $message->partition,
'offset' => $message->offset,
'timestamp' => $message->timestamp,
Copy link
Member

@jderusse jderusse Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of serializers will only take care of headers and body

@fabpot
Copy link
Member

fabpot commented Mar 26, 2022

Is it something we want to include for 6.1?

@KonstantinCodes
Copy link
Contributor Author

KonstantinCodes commented Mar 29, 2022

@fabpot I'll rebase the branch and fix the tests :)

@fabpot fabpot modified the milestones: 6.1, 6.2 May 20, 2022
@torbenbr
Copy link

torbenbr commented Aug 17, 2022

@KonstantinCodes What's the problem that you don't continue here? Do you need help or something? I would be very happy if continued here <3

@KonstantinCodes
Copy link
Contributor Author

KonstantinCodes commented Aug 17, 2022

@torbenbr hey, thank you for asking! Actually, the build system was puzzling me. Maybe we can take a look at that together?

@torbenbr
Copy link

torbenbr commented Aug 17, 2022

@KonstantinCodes I'll look at it tonight (Timezone UTC+2)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment