Getting ConsumerRecord Key must not be null error in Neo4j Kafka Streams plugin

Hi there, I'm running a Docker container with Neo4j 3.4.14 and Kafka Streams 3.4.4

I'm currently able to produce messages (using call streams.publish() ) but not seem to be able to consume any messages.

I'm checking the logs, and right after it says "Remote interface available at .....", it displays an error saying ConsumerRercord.key must not be null.

2019-08-12 16:26:29.265+0000 INFO  Remote interface available at http://localhost:9474/
2019-08-12 16:26:33.640+0000 ERROR consumerRecord.key() must not be null consumerRecord.key() must not be null
java.lang.IllegalStateException: consumerRecord.key() must not be null
        at streams.service.dlq.DLQData$Companion.from(DeadLetterQueueService.kt:23)
        at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaEventSink.kt:211)
        at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaEventSink.kt:202)
        at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaEventSink.kt:246)
        at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:120)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
        at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
        at kotlinx.coroutines.AbstractContinuation.run(AbstractContinuation.kt:19)
        at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)

It looks like there is an issue with the dead letter queue, but I added the topic to the config and to Kafka and still displays the same error:

      NEO4J_kafka_group_id: 'neo4j-cs-consumer'
      NEO4J_kafka_group_instance_id: 'neo4j-cs-consumer'
      NEO4J_streams_sink_enabled: 'true'
      NEO4J_streams_source_enabled: "true"
      NEO4J_streams_sink_dlq: 'interkafka4j-dlq-topic'
      NEO4J_dbms_memory_pagecache_size: 2G
      NEO4J_dbms_memory_heap_max__size: 4G
      NEO4J_dbms_connector_bolt_listen__address: '0.0.0.0:9687'
      NEO4J_dbms_connector_http_listen__address: '0.0.0.0:9474'
      NEO4J_dbms_connector_https_listen__address: '0.0.0.0:9473'

Has anyone run into this problem? Any ideas would be helpful!
Thanks in advance! :smile:

1 Like

This bug has been reported and I believe a fix is in the works and in PR stage at the github repo. Unfortunately I don't know enough about this one to offer any work-around information in the meantime. I"ll ask around internally and see if I can get additional followup.

Hi David!

Thanks for the prompt reply. My workaround was downgrading to 3.4.3 in the meantime :smiley:

Nonetheless, please keep me posted if you get any feedback on that.

Thanks!