CVE-2023-34040 Spring Kafka Deserialization Remote Code Execution
=================================================================
BY PYN3RD
2023-09-15 (Updated: 2023-09-18)
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#0x01-Preface "0x01 Preface")0x01 Preface
Here is the explicit description about Spring Kafka deserialization vulnerability in Vmware security bulletin.
![https://images.seebug.org/1695276921562-w331s](https://images.seebug.org/1695276921562-w331s)
Reference
[https://spring.io/security/cve-2023-34040](https://spring.io/security/cve-2023-34040)
According to the description in security bulletin, we can simply attain some critical points resulting in the vulnerability.
1. Setting the `ErrorHandlingDeserializer` as a key and/or value in the Kafka record in configuration.
2. Setting the boolean type properties `checkDeserExWhenKeyNull` and/or `checkDeserExWhenValueNull` to **true**.
3. The users can publish a Kafka topic without any verification.
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#0x02-Concepts-of-Kafka "0x02 Concepts of Kafka")0x02 Concepts of Kafka
Before deeply diving into the vulnerability, we promptly review some relevant concepts of the Kafka service.
![https://images.seebug.org/1695276978890-w331s](https://images.seebug.org/1695276978890-w331s)
* Producer:we call the object for publishing record **Kafka topic producer**
* Topic:The records are classified by the Kafka service, and each classification is named **Topic**.
* Broker:The published messages are stored in a group of servers, we call it Kafka cluster. Each of the server is a **Broker**. The consumer can attain the data form **Broker** and consume more than one topic.
* Consumer:The object which is used to subscribe message and handle with the published message is called **Kafka topi consumer**. The consumption messages are topic based.
Moreover,it is necessary to review the structure of Kafka record.
![https://images.seebug.org/1695276981393-w331s](https://images.seebug.org/1695276981393-w331s)
Kafka Record, we also call it **Message** or **Event** consisting of **Header** and **Body**. The header data virtually equals to **Metadata** including the basic elements like Topic, Patition and Timestamp. They are stored as a pair of key/value. The body data usually are the relevant business data stored as key/value constructure as well.
##### [](https://pyn3rd.github.io/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/#Preparation "Preparation")Preparation
Zookeeper server is required before deploying Kafka service.
1.Installing Zookeeper server by docker
1
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
2.Deploying Kafka server by docker
```
docker run -d --name kafka -p 9092:9092 \\
\-e KAFKA\_ZOOKEEPER\_CONNECT=192.168.5.102:2181 \\
\-e KAFKA\_ADVERTISED\_LISTENERS=PLAINTEXT://192.168.5.102:9092 \\
\-e KAFKA\_LISTENERS=PLAINTEXT://0.0.0.0:9092 \\
\-e TZ="Asia/Shanghai" \\
wurstmeister/kafka:latest
```
3.Spring Boot project imports the affected Kafka dependencies
Affected version:
* 2.8.1 to 2.9.10
* 3.0.0 to 3.0.9
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
```
4.Updating the configuration in `application.yaml`
![https://images.seebug.org/1695276984822-w331s](https://images.seebug.org/1695276984822-w331s)
5.Classes for demonstration
1)Kafka Producer Class
```
package com.example.SpringKafkaDemo.producer;
import com.example.SpringKafkaDemo.model.KafkaMessage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.\*;
import java.util.HashMap;
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/message/send")
public String sendMessage(@RequestBody KafkaMessage message) {
String topic = message.getTopic();
String data = message.getData();
HashMap<String, String> headers = message.getHeaders();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
for (String s : headers.keySet()) {
if (s.equals("springDeserializerExceptionKey")) {
String exceptData = headers.get(s);
byte\[\] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData);
producerRecord.headers().add(s, exceptHandler);
continue;
}
producerRecord.headers().add(s, headers.get(s).getBytes());
}
kafkaTemplate.send(producerRecord);
String jsonString="{\\"code\\":\\"200\\", \\"status\\":\\"success\\"}";
return jsonString;
}
private static byte\[\] hexStringtoBytes(String hexString) {
byte\[\] excepetionMessage = new byte\[hexString.length() / 2\];
for (int i = 0; i < excepetionMessage.length; i++) {
excepetionMessage\[i\] = (byte) Integer.parseInt(hexString.substring(i \* 2, i \* 2 + 2), 16);
}
return excepetionMessage;
}
}
```
By the way, here we use a type of design pattern in Java Language, **Template Method Pattern**. In this demonstration, I insert a template named `kafkaTemplate`.
Highlight of the code fragment
```
private KafkaTemplate<String, String> kafkaTemplate;
```
2)Kafka Consumer Class
```
package com.example.SpringKafkaDemo.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
```
3)Config Class for the Consumer
```
package com.example.SpringKafkaDemo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP\_ID\_CONFIG, groupId);
props.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG, "earliest");
props.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}
}
```
In acordance with the vulnerablity description in official bulletin, we should set both the `checkDeserExWhenKeyNull` and `checkDeserExWhenValueNull` properties to **true**.
```
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true)
factory.getContainerProperties().setCheckDeserExWhenValueNull(true)
```
Se the breakpoint at the `getExceptionFromHeader` function and then have the server start.
![https://images.seebug.org/1695276987948-w331s](https://images.seebug.org/1695276987948-w331s)
Step into `invokeIfHaveRecords` function, the record object will be deserialized.
![https://images.seebug.org/1695276992408-w331s](https://images.seebug.org/1695276992408-w331s)
![https://images.seebug.org/1695276995995-w331s](https://images.seebug.org/1695276995995-w331s)
Back to the `getExceptionFromHeader` function.
![https://images.seebug.org/1695277000119-w331s](https://images.seebug.org/1695277000119-w331s)
This function makes the value `springDeserializerExceptionKey` of `record.headers()` into the the value of the variables `headerName` and be delivered `header`.
And then deliver the value to `byteArrayToDeserializationException` function.
![https://images.seebug.org/1695277003464-w331s](https://images.seebug.org/1695277003464-w331s)
Step into `byteArrayToDeserializationException` function.
![https://images.seebug.org/1695277006290-w331s](https://images.seebug.org/1695277006290-w331s)
The `resolveClass` function is overrided to restrain arbitrary Java class deserialization. Actually, we can find the way of preventing Java deserialization vulnerability in many projects, like Apache Shiro, Fastjson.
![https://images.seebug.org/1695277009597-w331s](https://images.seebug.org/1695277009597-w331s)
Apparently, only the class
`org.springframework.kafka.support.serializer.DeserializationException` can be deserialized.
![https://images.seebug.org/1695277013113-w331s](https://images.seebug.org/1695277013113-w331s)
Step into `DeserializationException` function, it consists four arguments. One of them is `cause` which is used to invoke instantial class.
![https://images.seebug.org/1695277017699-w331s](https://images.seebug.org/1695277017699-w331s)
Write a malicious class and make it inherit the parent class `Throwable`.
![https://images.seebug.org/1695277021147-w331s](https://images.seebug.org/1695277021147-w331s)
Eventually, fill the value of the `springDeserializerExceptionKey` key in JSON data with the generated Java serialization. The remote code execution is trigged after send the HTTP request.
![https://images.seebug.org/1695277024360-w331s](https://images.seebug.org/1695277024360-w331s)
暂无评论