Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
I am producing messages with keys, headers, and payloads to a kafka topic, which is set up as an event source for a lambda function. When my lambda receives the event, it logs the message received. The message value is present, but the key
, and headers
fields are not, when the MSKRecord
type defined in the aws nodejs sdk shows those fields as required.
Expected Behavior
The kafka message's key and headers are important, so they should be included along with the message's value when creating the event records to invoke the lambda with.
How are you starting LocalStack?
With a docker-compose file
Steps To Reproduce
run localstack and kafka with a docker-compose file like:
version: "3"
services:
localstack-lookupdomain:
image: localstack/localstack:1.2.0
container_name: localstack
networks:
- localstack-ldn
hostname: localstack
privileged: true
ports:
- "4566:4566"
- "4571:4571"
environment:
- EDGE_PORT=4566
- TZ="America/New_York"
- DEFAULT_REGION=us-east-1
- TEST_AWS_ACCOUNT_ID=000000000000
- DEBUG=${LOCALSTACK_DEBUG:-1}
- DATA_DIR=/tmp/localstack-ldn-2/data
- HOST_TMP_FOLDER=/tmp/localstack-ldn
- HOSTNAME_EXTERNAL=localstack-ldn
- SERVICES=s3, dynamodb, sqs, sts, iam, ssm, sns, events, vpc, lambda, kms, cwl, secretsmanager, ec2
- LAMBDA_REMOTE_DOCKER=false
- LAMBDA_DOCKER_NETWORK=localstack-ldn
- LAMBDA_EXECUTOR=docker-reuse
- LAMBDA_STAY_OPEN_MODE=0
- LOCALSTACK_API_KEY=*****
volumes:
- localstack-ldn-vol-2:/tmp/localstack-ldn-2
# this works even on windows to expose docker daemon to container
- /var/run/docker.sock:/var/run/docker.sock
- type: bind
source: ../.tmp/lib/localstack
target: /var/lib/localstack
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
hostname: zookeeper
ports:
- "2181:2181"
networks:
- localstack-ldn
environment:
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf -Dzookeeper.kerberos.removeHostFromPrincipal=true -Dzookeeper.kerberos.removeRealmFromPrincipal=true -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl"
volumes:
- ./zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
kafka:
container_name: kafka
image: wurstmeister/kafka
hostname: "kafka"
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
networks:
- localstack-ldn
environment:
KAFKA_LISTENERS: SASL_PLAINTEXT://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SASL_PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin"
KAFKA_CREATE_TOPICS: "my-topic-name:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
links:
- zookeeper
kowl:
image: quay.io/cloudhut/kowl:v1.3.1
container_name: kowl
restart: always
ports:
- "8081:8080"
depends_on:
- kafka
environment:
- KAFKA_BROKERS=kafka:9092
volumes:
localstack-ldn-vol-2:
external: false
networks:
localstack-ldn:
external: true
name: localstack-ldn
Then create an event source mapping from one of the kafka topics to a lambda using SASL_SCRAM_512_AUTH. I am using terraform for this. In the lambda, log the records received by the lambda to the console.
Then, produce messages with keys to the kafka topic which triggers the lambda. I verified that my keys actually exist by consuming the topic using kcat
. An example record logged from my lambda is
{
"topic": "my-topic-name",
"partition": 0,
"offset": 83,
"timestamp": 1668635066863,
"timestampType": "CREATE_TIME",
"value": "foo"
}
Environment
- OS: macos monterey 12.6
- LocalStack: 1.2.0
Anything else?
No response