Skip to content

Commit

Permalink
spring-projectsGH-2321: Support Inbound Header Mapping Matchers
Browse files Browse the repository at this point in the history
Resolves spring-projects#2321

**cherry-pick to 2.9.x, 2.8.x**
  • Loading branch information
garyrussell authored and artembilan committed Jun 28, 2022
1 parent 2c5c88c commit 2fcb082
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 34 deletions.
31 changes: 31 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4762,6 +4762,8 @@ public interface KafkaHeaderMapper {
----
====

The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values.

The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A "`special`" header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.
Expand Down Expand Up @@ -4850,6 +4852,35 @@ public void testSpecificStringConvert() {
----
====

Both header mappers map all inbound headers, by default.
Starting with version 2.8.8, the patterns, can also applied to inbound mapping.
To create a mapper for inbound mapping, use one of the static methods on the respective mapper:

====
[source, java]
----
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
----
====

For example:

====
[source, java]
----
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
----
====

This will exclude all headers beginning with `abc` and include all others.

By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the class path.

With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,53 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
this.rawMappedHeaders.put(KafkaHeaders.LISTENER_INFO, true);
}

private final boolean outbound;

private boolean mapAllStringsOut;

private Charset charset = StandardCharsets.UTF_8;

/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
* mapped.
* @param patterns the patterns.
*/
public AbstractKafkaHeaderMapper(String... patterns) {
this(true, patterns);
}

/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
* mapped.
* @param outbound true for an outbound mapper.
* @param patterns the patterns.
*/
protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
Assert.notNull(patterns, "'patterns' must not be null");
this.matchers.add(new NeverMatchHeaderMatcher(
KafkaHeaders.ACKNOWLEDGMENT,
KafkaHeaders.CONSUMER,
KafkaHeaders.KEY,
KafkaHeaders.OFFSET,
KafkaHeaders.PARTITION,
KafkaHeaders.RAW_DATA,
KafkaHeaders.RECEIVED_KEY,
KafkaHeaders.RECEIVED_PARTITION,
KafkaHeaders.RECEIVED_TIMESTAMP,
KafkaHeaders.RECEIVED_TOPIC,
KafkaHeaders.TIMESTAMP,
KafkaHeaders.TIMESTAMP_TYPE,
KafkaHeaders.BATCH_CONVERTED_HEADERS,
KafkaHeaders.NATIVE_HEADERS,
KafkaHeaders.TOPIC,
KafkaHeaders.DELIVERY_ATTEMPT,
KafkaHeaders.LISTENER_INFO,
KafkaHeaders.GROUP_ID));
this.outbound = outbound;
if (outbound) {
this.matchers.add(new NeverMatchHeaderMatcher(
KafkaHeaders.ACKNOWLEDGMENT,
KafkaHeaders.CONSUMER,
KafkaHeaders.KEY,
KafkaHeaders.OFFSET,
KafkaHeaders.PARTITION,
KafkaHeaders.RAW_DATA,
KafkaHeaders.RECEIVED_KEY,
KafkaHeaders.RECEIVED_PARTITION,
KafkaHeaders.RECEIVED_TIMESTAMP,
KafkaHeaders.RECEIVED_TOPIC,
KafkaHeaders.TIMESTAMP,
KafkaHeaders.TIMESTAMP_TYPE,
KafkaHeaders.BATCH_CONVERTED_HEADERS,
KafkaHeaders.NATIVE_HEADERS,
KafkaHeaders.TOPIC,
KafkaHeaders.DELIVERY_ATTEMPT,
KafkaHeaders.LISTENER_INFO,
KafkaHeaders.GROUP_ID));
}
for (String pattern : patterns) {
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
}
Expand Down Expand Up @@ -168,6 +190,27 @@ protected boolean matches(String header, Object value) {
}

protected boolean matches(String header) {
Assert.state(this.outbound, "This mapper cannot be used for outbound mapping");
return doesMatch(header);
}

/**
* Matches header names for inbound mapping when configured as an inbound mapper.
* @param header the header name.
* @return true if it can be mapped.
* @since 2.8.8
*/
protected boolean matchesForInbound(String header) {
if (this.outbound) {
return true;
}
if (this.matchers.size() == 0) {
return true;
}
return doesMatch(header);
}

private boolean doesMatch(String header) {
for (HeaderMatcher matcher : this.matchers) {
if (matcher.matchHeader(header)) {
return !matcher.isNegated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,39 @@ public DefaultKafkaHeaderMapper(String... patterns) {
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
super(patterns);
this(true, objectMapper, patterns);
}

private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, String... patterns) {
super(outbound, patterns);
Assert.notNull(objectMapper, "'objectMapper' must not be null");
Assert.noNullElements(patterns, "'patterns' must not have null elements");
this.objectMapper = objectMapper;
this.objectMapper
.registerModule(new SimpleModule().addDeserializer(MimeType.class, new MimeTypeJsonDeserializer()));
}

/**
* Create an instance for inbound mapping only with pattern matching.
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
*/
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
return new DefaultKafkaHeaderMapper(false, JacksonUtils.enhancedObjectMapper(), patterns);
}

/**
* Create an instance for inbound mapping only with pattern matching.
* @param objectMapper the object mapper.
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
*/
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
return new DefaultKafkaHeaderMapper(false, objectMapper, patterns);
}

/**
* Return the object mapper.
* @return the mapper.
Expand Down Expand Up @@ -288,19 +313,20 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
public void toHeaders(Headers source, final Map<String, Object> headers) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach(header -> {
if (header.key().equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
headers.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
String headerName = header.key();
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else if (header.key().equals(KafkaHeaders.LISTENER_INFO)) {
headers.put(header.key(), new String(header.value(), getCharset()));
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
headers.put(headerName, new String(header.value(), getCharset()));
}
else if (!(header.key().equals(JSON_TYPES))) {
if (jsonTypes != null && jsonTypes.containsKey(header.key())) {
String requestedType = jsonTypes.get(header.key());
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (jsonTypes != null && jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
populateJsonValueHeader(header, requestedType, headers);
}
else {
headers.put(header.key(), headerValueToAddIn(header));
headers.put(headerName, headerValueToAddIn(header));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,21 @@ public SimpleKafkaHeaderMapper() {
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public SimpleKafkaHeaderMapper(String... patterns) {
super(patterns);
this(true, patterns);
}

private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
super(outbound, patterns);
}

/**
* Create an instance for inbound mapping only with pattern matching.
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
*/
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
return new SimpleKafkaHeaderMapper(false, patterns);
}

@Override
Expand All @@ -91,11 +105,14 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
source.forEach(header -> {
if (header.key().equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(header.key(), headerValueToAddIn(header));
String headerName = header.key();
if (matchesForInbound(headerName)) {
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,26 @@ void listenerInfo() {
assertThat(headers.lastHeader(KafkaHeaders.LISTENER_INFO)).isNull();
}

@Test
void inboundJson() {
DefaultKafkaHeaderMapper outboundMapper = new DefaultKafkaHeaderMapper();
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!fo*", "*");
HashMap<String, Object> map = new HashMap<>();
map.put("foo", "bar");
map.put("foa", "bar");
map.put("baz", "qux");
MessageHeaders msgHeaders = new MessageHeaders(map);
Headers headers = new RecordHeaders();
outboundMapper.fromHeaders(msgHeaders, headers);
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
map.clear();
inboundMapper.toHeaders(headers, map);
assertThat(map).doesNotContainKey("foo")
.doesNotContainKey("foa")
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT)
.containsKey("baz");
}

public static final class Foo {

private String bar = "bar";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.support;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.entry;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -159,4 +160,30 @@ void listenerInfo() {
assertThat(headers.lastHeader(KafkaHeaders.LISTENER_INFO)).isNull();
}

@Test
void inboundMappingNoPatterns() {
SimpleKafkaHeaderMapper inboundMapper = SimpleKafkaHeaderMapper.forInboundOnlyWithMatchers();
Headers headers = new RecordHeaders();
headers.add("foo", "bar".getBytes());
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
Map<String, Object> mapped = new HashMap<>();
inboundMapper.toHeaders(headers, mapped);
assertThat(mapped).containsKey("foo")
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT);
assertThatIllegalStateException()
.isThrownBy(() -> inboundMapper.fromHeaders(new MessageHeaders(mapped), headers));
}

@Test
void inboundMappingWithPatterns() {
SimpleKafkaHeaderMapper inboundMapper = SimpleKafkaHeaderMapper.forInboundOnlyWithMatchers("!foo", "*");
Headers headers = new RecordHeaders();
headers.add("foo", "bar".getBytes());
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
Map<String, Object> mapped = new HashMap<>();
inboundMapper.toHeaders(headers, mapped);
assertThat(mapped).doesNotContainKey("foo")
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT);
}

}

0 comments on commit 2fcb082

Please sign in to comment.