Month: October 2020

  • Deleting kafka topics from a consumer group

    Kafka consumer groups let you keep track of the latest offsets consumed for a given topic/partition. We ran into an issue recently when we started monitoring the lag for a given consumer group using kafka-lag-exporter, though: if your consumer group has ever committed an offset for a given topic, it stays there as long as the consumer group exists.

    We tried deleting it using the kafka-consumer-groups command line tool, but we got a message saying that the operation wasn’t supported by our broker (we are using MSK).

    So what to do? Well, I first started by looking into poking around the __consumer_offsets topic, and then noped out of there when I saw that it stores data and keys in some binary format that you need to use a Java class to parse.

    The next idea I had was to delete the consumer group, and recreate it, leaving out the offensive topic(s). But we subscribed to a lot of topics! Well, a little bash can go a long way.

    #!/usr/bin/env bash
    set -o errexit
    set -o nounset
    
    export kafka_server=your-kafka-server:9092
    export consumer_group=your-consumer-group
    export skiptopics=("old-unused-topic1" "old-unused-topic2")
    
    containsElement () {
      local e match="$1"
      shift
      for e; do [[ "$e" == "$match" ]] && return 0; done
      return 1
    }
    
    current_offsets=$(docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --describe | tail -n +3 | awk '{ print $2, $4}')
    
    echo "Current offsets:"
    echo "$current_offsets"
    
    ( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --delete )
    
    while IFS= read -r line; do
      arr=($line)
      topic="${arr[0]}"
      if containsElement "$topic" "${skiptopics[@]}"; then
        continue
      fi
      offset="${arr[1]}"
      ( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --reset-offsets --topic $topic --to-offset $offset --execute )
    done < <(printf '%s\n' "$current_offsets")