Description
Hi all,
We test possibility of using Red Panda Connect (RPC) in our system to make some transformations within the same kafka cluster.
As part of tests we did scaling test where we read from a single topic and then output to another topic while decoding and then encoding avro message. As long as topic is processed by single RPC instance it works fine, but as soon as we add more RPC instances, the older RPC instance stops processing partitions assigned to it. The last logs from older instance are below, not sure if this certificate thing is related to it.
Except that, I see no other potential reason why this is happening, there are no error logs in the RPC instances, consumer group on kafka is rebalanced properly, new RPC instance processes its partitions, but somehow the old RPC does not.
Tested on standalone as well on streams mode, problem occurs in both cases. We use helm chart deployment 3.0.3, our kafka cluster is managed by strimzi, version 3.8.0
Config below, I set checkpoint_limit to 1 and sleep processor to 60s by purpose, to inspect behavior of the setup during scaling, so that one RPC instance produces exactly 1 message per partition each minute.
Config
config:
http:
enabled: true
address: 0.0.0.0:4195
root_path: /
debug_endpoints: true
cert_file: ""
key_file: ""
cors:
enabled: false
allowed_origins: [ ]
basic_auth:
enabled: false
realm: restricted
username: ""
password_hash: ""
algorithm: sha256
salt: ""
logger:
level: DEBUG
format: logfmt
add_timestamp: true
static_fields:
'@service': redpanda-connect
image:
repository: "docker.redpanda.com/redpandadata/connect"
deployment:
replicaCount: 1
streams:
enabled: true
streamsConfigMap: "connect-streams"
Streams config
input:
label: "input-kafka"
kafka:
addresses: [ "brokers:9092" ]
topics: [ "input-topic" ]
target_version: 3.8.0
consumer_group: "red-panda-connect-3"
checkpoint_limit: 1
auto_replay_nacks: true
sasl:
user: "admin"
password: "password"
mechanism: "SCRAM-SHA-512"
tls:
enabled: true
skip_cert_verify: true
pipeline:
processors:
- label: "sleep"
sleep:
duration: "60s"
- try:
- label: avro_decoder
schema_registry_decode:
url: "http://schema-registry"
- label: avro_encoder
schema_registry_encode:
url: "http://schema-registry"
subject: "output-topic-value"
refresh_period: 1m
- log:
message: "Processor processed single message"
- catch:
- log:
message: "Processor failed due to: ${!error()}"
output:
label: ""
kafka:
addresses: [ "brokers:9092" ]
topic: "output-topic"
partitioner: murmur2_hash
target_version: 3.8.0
compression: none
sasl:
user: "admin"
password: "password"
mechanism: "SCRAM-SHA-512"
tls:
enabled: true
skip_cert_verify: true
metadata:
exclude_prefixes: [ ]
max_in_flight: 64
custom_topic_creation:
enabled: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
Activity
mihaitodor commentedon May 13, 2025
Hey @Pavelsky89, the
https://m.rp.vectorized.io/connect/telemetry
error is just the telemetry that we have in place by default. Maybe you have a firewall which blocks it. However, it shouldn't stop the message flow if it can't connect.Otherwise, I don't see anything wrong with your setup, but the old
kafka
input and output don't offer much in terms of error logs when things go wrong. Would you mind switching to the newredpanda
input and output (orkafka_franz
if record order isn't important)? That should provide more detailed error logs if things go wrong.Pavelsky89 commentedon May 14, 2025
Thanks for reply, I tried redpanda input and output but it did not even start consuming input topics, all what it did it registered consumer group and then waited about 20 minutes before it polled some records from one of the partitions, and then waited again.
Next redeployment I did did not even read anything after 1 hour, so I did not wait longer. Last logs seen from second attempt below
Anyway, getting back to kafka input and output, it appears that problematic setting was checkpoint_limit I used. After I removed it, scaling started working properly. Any idea why this fixed the problem?