Neo4j sink instance not receiving events from Kafka (kafka Topic -> Neo4j)

Hello,
I have configured neo4j and Kafka in K8s cluster (standalone). I'm using neo4j V4.1.3. I have configured neo4j as sink to consume JSON messages from Kafka topic.
On sending the JSON message on a kafka topic, I don't see any debug message to indicate neo4j is receiving the messages (NEO4J_dbms_logs_debug_level="DEBUG"). Below is the configuration. I can see the message using command
#./kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic neo4j-topic --from-beginning

NEO4J_zookeeper_connect: "zookeeper:2181"
  NEO4J_kafka_bootstrap_servers: "kafka-test:9092"
  NEO4J_kafka_auto_offset_reset: "earliest"
  NEO4J_kafka_group_id: "neo4j"
  NEO4J_kafka_enable_auto_commit: "true"
  NEO4J_kafka_key_deserializer: "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  NEO4J_kafka_value_deserializer: "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  NEO4J_streams_sink_enabled: "true"
  NEO4J_streams_source_enabled: "false"
  NEO4J_streams_sink_topic_cypher_neo4j-topic: "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"
  NEO4J_dbms_logs_debug_level: "DEBUG"
  NEO4J_kafka_request_timeout_ms: "20000"
  NEO4J_kafka_retry_backoff_ms: "500"  
  NEO4J_dbms_default__database: "neo4j"
  NEO4J_streams_sink_errors_log_enable: 'true'
  NEO4J_streams_sink_errors_log_include_messages: 'true'
  NEO4J_streams_sink_errors_tolerance: 'all'

In the neo4j streams web page it's mentioned that all the configuration should go into neo4j.conf. But I came across a discussion in the neo4j community Neo4j kafka sink not initializing/working when trying to stream data explaining the need to create a new streams.conf file.
I'm confused now. On starting Neo4j there are no messages related to neo4j streams plugin being deployed in spite of running in debug mode. Could you please help.

Logs:

Configuration override prefix = neo4j_test_neo4j_core_0
Starting Neo4j CORE 0 on neo4j-test-neo4j-core-0.neo4j-test-neo4j.kafka-test.svc.cluster.local
Warning: Some files inside "/data" are not writable from inside container. Changing folder owner to neo4j.
Changed password for user 'neo4j'.
parse error: Invalid literal at line 1, column 15
Directories in use:
  home:         /var/lib/neo4j
  config:       /var/lib/neo4j/conf
  logs:         /data/logs
  plugins:      /plugins
  import:       /var/lib/neo4j/import
  data:         /var/lib/neo4j/data
  certificates: /var/lib/neo4j/certificates
  run:          /var/lib/neo4j/run
Starting Neo4j.
2021-07-14 11:30:51.379+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.include.messages
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: kafka.auto.offset.reset
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: kafka.request.timeout.ms
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.enable
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: kafka.enable.auto.commit
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: zookeeper.connect
2021-07-14 11:30:51.381+0000 WARN  Unrecognized setting. No declared setting with name: streams.source.enabled
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.tolerance
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: kafka.key.deserializer
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: apoc.import.file.use_neo4j_config
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: kafka.retry.backoff.ms
2021-07-14 11:30:51.382+0000 WARN  Unrecognized setting. No declared setting with name: kafka.value.deserializer
2021-07-14 11:30:51.382+0000 INFO  Starting...
2021-07-14 11:30:53.723+0000 INFO  ======== Neo4j 4.1.10 ========
2021-07-14 11:30:55.874+0000 DEBUG Reading users from /var/lib/neo4j/data/dbms/auth.ini
2021-07-14 11:30:55.886+0000 INFO  Sending metrics to CSV file at /var/lib/neo4j/metrics
2021-07-14 11:30:55.910+0000 INFO  Bolt enabled on 0.0.0.0:7687.
2021-07-14 11:30:56.135+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.CausalClusteringService] at [/db]
2021-07-14 11:30:56.136+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyCausalClusteringRedirectService] at [/db/manage]
2021-07-14 11:30:56.140+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-07-14 11:30:56.141+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-07-14 11:30:56.144+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db]
2021-07-14 11:30:56.144+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-07-14 11:30:56.212+0000 DEBUG Mounting servlet at [/db/manage]
2021-07-14 11:30:56.272+0000 DEBUG Mounting servlet at [/db/data]
2021-07-14 11:30:56.273+0000 DEBUG Mounting servlet at [/db]
2021-07-14 11:30:56.289+0000 DEBUG Mounting servlet at [/]
2021-07-14 11:30:57.119+0000 INFO  Remote interface available at http://neo4j-test-neo4j-core-0.neo4j-test-neo4j.kafka-test.svc.cluster.local:7474/
2021-07-14 11:30:57.122+0000 INFO  Started.

Hi @bhuvana.rs unfortunately I think I see the problem, but this is not an easy one to explain.

  NEO4J_streams_sink_topic_cypher_neo4j-topic: "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"

It's all about the dash in your topic name. Note that you are setting an ENVIRONMENT VARIABLE when you express NEO4J_streams_sink_topic_cypher_neo4j-topic. Environment variables (inside of the Neo4j docker container) are evaluated using the bash shell, and environment variables may not include the - character. I would bet if you investigate, what has actually happened, due to a limitation in bash, is that you have configured it to listen to a topic called neo4j and not neo4j-topic due to this limitation in the environment variable.

Notice even in the syntax highlighting that this website uses, the blue in the first part of the env var name, and the white of the last part. It's a parse error on the name of an environment variable.

You have a few options:

  1. Consume from a topic that doesn't include a dash in the name
  2. Use the dynamic configuration support to change subscription similar to (1)
  3. Run neo4j not in a docker container so that you can use a - character inside of the conf file directly

Thanks for your response @david_allen . Along with the change you suggested, I also observed that the plugins directory (neo4j streams lib) was configured to a wrong directory and these 2 fixes solved the issue.
Thank you.

1 Like