Skip to content

bug: Kafka message key missing in lambda event when using self-managed kafka as lambda event source #7193

Closed
@isaac-j-miller

Description

@isaac-j-miller

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

I am producing messages with keys, headers, and payloads to a kafka topic, which is set up as an event source for a lambda function. When my lambda receives the event, it logs the message received. The message value is present, but the key, and headers fields are not, when the MSKRecord type defined in the aws nodejs sdk shows those fields as required.

Expected Behavior

The kafka message's key and headers are important, so they should be included along with the message's value when creating the event records to invoke the lambda with.

How are you starting LocalStack?

With a docker-compose file

Steps To Reproduce

run localstack and kafka with a docker-compose file like:

version: "3"

services:
  localstack-lookupdomain:
    image: localstack/localstack:1.2.0
    container_name: localstack
    networks:
      - localstack-ldn
    hostname: localstack
    privileged: true
    ports:
      - "4566:4566"
      - "4571:4571"
    environment:
      - EDGE_PORT=4566
      - TZ="America/New_York"
      - DEFAULT_REGION=us-east-1
      - TEST_AWS_ACCOUNT_ID=000000000000
      - DEBUG=${LOCALSTACK_DEBUG:-1}
      - DATA_DIR=/tmp/localstack-ldn-2/data
      - HOST_TMP_FOLDER=/tmp/localstack-ldn
      - HOSTNAME_EXTERNAL=localstack-ldn
      - SERVICES=s3, dynamodb, sqs, sts, iam, ssm, sns, events, vpc, lambda, kms, cwl, secretsmanager, ec2
      - LAMBDA_REMOTE_DOCKER=false
      - LAMBDA_DOCKER_NETWORK=localstack-ldn
      - LAMBDA_EXECUTOR=docker-reuse
      - LAMBDA_STAY_OPEN_MODE=0
      - LOCALSTACK_API_KEY=*****
    volumes:
      - localstack-ldn-vol-2:/tmp/localstack-ldn-2
      # this works even on windows to expose docker daemon to container
      - /var/run/docker.sock:/var/run/docker.sock
      - type: bind
        source: ../.tmp/lib/localstack
        target: /var/lib/localstack

  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    networks:
      - localstack-ldn
    environment:
      JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf  -Dzookeeper.kerberos.removeHostFromPrincipal=true -Dzookeeper.kerberos.removeRealmFromPrincipal=true -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl"
    volumes:
      - ./zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    hostname: "kafka"
    ports:
      - "9092:9092"
      - "9094:9094"
    depends_on:
      - zookeeper
    networks:
      - localstack-ldn
    environment:
      KAFKA_LISTENERS: SASL_PLAINTEXT://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SASL_PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
      KAFKA_SUPER_USERS: "User:admin"
      KAFKA_CREATE_TOPICS: "my-topic-name:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
    links:
      - zookeeper
  kowl:
    image: quay.io/cloudhut/kowl:v1.3.1
    container_name: kowl
    restart: always
    ports:
      - "8081:8080"
    depends_on:
      - kafka
    environment:
      - KAFKA_BROKERS=kafka:9092
volumes:
  localstack-ldn-vol-2:
    external: false


networks:
  localstack-ldn:
    external: true
    name: localstack-ldn

Then create an event source mapping from one of the kafka topics to a lambda using SASL_SCRAM_512_AUTH. I am using terraform for this. In the lambda, log the records received by the lambda to the console.
Then, produce messages with keys to the kafka topic which triggers the lambda. I verified that my keys actually exist by consuming the topic using kcat. An example record logged from my lambda is

{
  "topic": "my-topic-name",
  "partition": 0,  
  "offset": 83,
  "timestamp": 1668635066863,
  "timestampType": "CREATE_TIME",
  "value": "foo"
 }

Environment

- OS: macos monterey 12.6
- LocalStack: 1.2.0

Anything else?

No response

Metadata

Metadata

Assignees

Labels

aws:kafkaAmazon Managed Streaming for Apache Kafkaaws:lambdaAWS Lambdastatus: backlogTriaged but not yet being worked ontype: bugBug report

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions