Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Adding support for geo-dr and offsets. #43834

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
455d8bf
Add ReplicationSegment into Checkpoint
conniey Nov 27, 2023
6c34fea
Add replicationSegment into PartitionProperties
conniey Nov 27, 2023
0bf35bc
Add replicationSegment into SystemProperties
conniey Nov 27, 2023
2ea610a
Add x-opt-replication segment.
conniey Nov 28, 2023
60e7a86
Populate PartitionProperties when results are deserialised.
conniey Nov 29, 2023
3266b8d
Add "sequence number epoch" constants to management channel.
conniey Dec 1, 2023
370297c
Add replication segment getter.
conniey Dec 1, 2023
fc61dd3
Adding comment for message constant.
conniey Dec 6, 2023
8d13a59
Fix partition properties test.
conniey Dec 6, 2023
3b94bb2
Add replicationSegment to EventPosition.
conniey Dec 6, 2023
bb6fcde
Add replicationSegment to LastEnqueuedEventProperties
conniey Dec 6, 2023
ae4b61c
Fix build breaks adding LastEnqueuedProperties ctor.
conniey Dec 11, 2023
1bde2d1
Add replication capabilities and update expression logic.
conniey Jan 9, 2024
8ec9b9f
Add tests for EventPosition.
conniey Jan 10, 2024
cda00f6
Add tests for event position expression
conniey Jan 17, 2024
ee53426
Add javadoc to getExpression.
conniey Jan 17, 2024
17cc765
Add replication segment when generating EventData
conniey Jan 17, 2024
264af29
Remove unused symbols.
conniey Jan 17, 2024
b6a4b37
Update system properties test in EventData
conniey Jan 17, 2024
e12f0a0
Split lines for easier reading.
conniey Jan 17, 2024
3262cff
Add tests to deserialize EventData.
conniey Jan 17, 2024
02d5da7
Change from long to int.
conniey Jan 25, 2024
adfc783
Consider symbol and string key.
conniey Jan 25, 2024
ad97dfa
Move default replication segment into ClientConstants
conniey Jan 29, 2024
9d3a900
Modify preference for checkpoint position to use sequence number first.
conniey Jan 29, 2024
05f350c
Update BlobCheckpointStore to persist replication segment.
conniey Jan 29, 2024
8244d5f
Add test case for replication segment.
conniey Jan 29, 2024
d056c1d
Updated CHANGELOG.
conniey Jan 30, 2024
dff776e
Update CHANGELOG.
conniey Jan 30, 2024
85f03ca
use unreleased version of azure-core-amqp
conniey Sep 25, 2024
003e8c1
Fixing build breaks from LastEnqueuedEventProperties ctor change.
conniey Sep 25, 2024
2b0f676
Update Checkpoint to use OffsetString
conniey Nov 19, 2024
b02335d
Update EventData to return getOffsetString
conniey Nov 22, 2024
81d7266
Extract OffsetString in serializer.
conniey Nov 26, 2024
f629b9f
Fix test breaks using OffsetString
conniey Nov 26, 2024
ff6258d
Fix test breaks using OffsetString
conniey Nov 26, 2024
17a980e
Update EventPosition with offset string.
conniey Nov 26, 2024
57cf6e1
Update LastEnqueuedEventProperties
conniey Nov 26, 2024
fb7df54
Adding OffsetString in SystemProperties
conniey Nov 27, 2024
c7814b7
Fix build breaks in SystemPropertiesTest
conniey Nov 27, 2024
02b5668
Add deprecated annotation.
conniey Dec 2, 2024
d3fef20
Fix EventHubMessageSerializer class/test
conniey Jan 14, 2025
40dd323
Migrates EventHubPartitionAsyncConsumer to use offsetString
conniey Jan 14, 2025
13e3d9c
Fix EventHubReactorSession
conniey Jan 14, 2025
4c5d5e1
UpdateEventContextBatch test.
conniey Jan 14, 2025
9937ad4
Update offset in Utils to string.
conniey Jan 14, 2025
c62aa22
Update SystemProperties
conniey Jan 15, 2025
e943bf9
PartitionProperties formatting update
conniey Jan 15, 2025
5335ea1
Update creating EventPosition if cannot be parsed as number.
conniey Jan 15, 2025
f832027
Update assertions in EventData
conniey Jan 15, 2025
f38b062
Fix spacing in ManagementChannel
conniey Jan 15, 2025
98b4ea5
Update constructor on LastEnqueuedEventProperties
conniey Jan 15, 2025
544be48
Calculate offset rather than set it
conniey Jan 17, 2025
8b17017
Suppressing deprecations.
conniey Jan 17, 2025
2aad27d
ToString
conniey Jan 17, 2025
303aa08
Update logic to get default initial position.
conniey Jan 17, 2025
e9ea8b7
Adding test cases for partiton pump manager.
conniey Jan 17, 2025
602686f
Get message with string offsets.
conniey Jan 17, 2025
e139c95
Format with replication segment.
conniey Jan 17, 2025
857f75c
Add additional asserts to remove warnings.
conniey Jan 17, 2025
8bc65be
Fixing test breaks.
conniey Jan 17, 2025
a43ac3b
Formatting.
conniey Jan 17, 2025
69da303
Formatting changes.
conniey Jan 17, 2025
398cdab
Update EventDataBatch to persist all information.
conniey Jan 17, 2025
0ca163c
Fix mocking.
conniey Jan 17, 2025
1891cf6
Fix mocking.
conniey Jan 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ io.clientcore:http-stress;1.0.0-beta.1;1.0.0-beta.1
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->

unreleased_com.azure:azure-core-amqp;2.10.0-beta.1

unreleased_com.azure:azure-messaging-servicebus;7.18.0-beta.2
unreleased_com.azure:azure-messaging-eventhubs;5.20.0-beta.1
unreleased_com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.21.0-beta.1

unreleased_com.azure:azure-monitor-opentelemetry-exporter;1.0.0-beta.31
unreleased_com.azure:azure-monitor-opentelemetry-autoconfigure;1.0.0-beta.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ public enum AmqpMessageConstant {
/**
* The state of message.
*/
MESSAGE_STATE_ANNOTATION_NAME("x-opt-message-state");
MESSAGE_STATE_ANNOTATION_NAME("x-opt-message-state"),
/**
* The replication segment for the message.
*/
REPLICATION_SEGMENT_ANNOTATION_NAME("x-opt-sequence-number-epoch");

private static final Map<String, AmqpMessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 1.21.0-beta.1 (Unreleased)

- Added feature to persist/rehydrate replication segment field to/from Azure Storage Blobs.

### Features Added

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* @see EventProcessorClient
*/
public class BlobCheckpointStore implements CheckpointStore {

static final String REPLICATION_SEGMENT = "replicationsegment";
static final String SEQUENCE_NUMBER = "sequencenumber";
static final String OFFSET = "offset";
static final String OWNER_ID = "ownerid";
Expand All @@ -57,6 +57,7 @@ public class BlobCheckpointStore implements CheckpointStore {
private static final String SEQUENCE_NUMBER_LOG_KEY = "sequenceNumber";
private static final String BLOB_NAME_LOG_KEY = "blobName";
private static final String OFFSET_LOG_KEY = "offset";
private static final String REPLICATION_SEGMENT_LOG_KEY = "replicationSegment";

/**
* An empty string.
Expand Down Expand Up @@ -137,10 +138,12 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.addKeyValue(SEQUENCE_NUMBER_LOG_KEY, metadata.get(SEQUENCE_NUMBER))
.addKeyValue(OFFSET_LOG_KEY, metadata.get(OFFSET))
.addKeyValue(REPLICATION_SEGMENT_LOG_KEY, metadata.get(REPLICATION_SEGMENT))
.log(Messages.CHECKPOINT_INFO);

Long sequenceNumber = null;
Long offset = null;
Integer replicationSegment = null;
if (!CoreUtils.isNullOrEmpty(metadata.get(SEQUENCE_NUMBER))) {
sequenceNumber = Long.parseLong(metadata.get(SEQUENCE_NUMBER));
}
Expand All @@ -149,13 +152,19 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
offset = Long.parseLong(metadata.get(OFFSET));
}

Checkpoint checkpoint = new Checkpoint().setFullyQualifiedNamespace(names[0])
if (!CoreUtils.isNullOrEmpty(metadata.get(REPLICATION_SEGMENT))) {
replicationSegment = Integer.parseInt(metadata.get(REPLICATION_SEGMENT));
}

Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(names[0])
.setEventHubName(names[1])
.setConsumerGroup(names[2])
// names[3] is "checkpoint"
.setPartitionId(names[4])
.setSequenceNumber(sequenceNumber)
.setOffset(offset);
.setOffset(offset)
.setReplicationSegment(replicationSegment);

return Mono.just(checkpoint);
}
Expand Down Expand Up @@ -238,20 +247,29 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
"Both sequence number and offset cannot be null when updating a checkpoint")));
}

String partitionId = checkpoint.getPartitionId();
String blobName = getBlobName(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(),
final String partitionId = checkpoint.getPartitionId();
final String blobName = getBlobName(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(),
checkpoint.getConsumerGroup(), partitionId, CHECKPOINT_PATH);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName));
}

Map<String, String> metadata = new HashMap<>();
String sequenceNumber
= checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());

String offset = checkpoint.getOffset() == null ? null : String.valueOf(checkpoint.getOffset());
final String sequenceNumber = checkpoint.getSequenceNumber() != null
? String.valueOf(checkpoint.getSequenceNumber())
: null;
final String offset = checkpoint.getOffset() != null
? String.valueOf(checkpoint.getOffset())
: null;
final String replicationSegment = checkpoint.getReplicationSegment() != null
? String.valueOf(checkpoint.getReplicationSegment())
: null;

final Map<String, String> metadata = new HashMap<>();
metadata.put(SEQUENCE_NUMBER, sequenceNumber);
metadata.put(OFFSET, offset);
metadata.put(REPLICATION_SEGMENT, replicationSegment);

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

return blobAsyncClient.exists().flatMap(exists -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand All @@ -44,15 +45,18 @@
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.OFFSET;
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.OWNERSHIP_PATH;
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.OWNER_ID;
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.REPLICATION_SEGMENT;
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.SEQUENCE_NUMBER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand All @@ -70,11 +74,14 @@ public class BlobCheckpointStoreTests {
@Mock
private BlobAsyncClient blobAsyncClient;

@Captor
private ArgumentCaptor<Map<String, String>> metadataArgumentCaptor;

private AutoCloseable autoCloseable;

@BeforeEach
public void beforeEach() {
this.autoCloseable = MockitoAnnotations.openMocks(this);
autoCloseable = MockitoAnnotations.openMocks(this);
}

@AfterEach
Expand Down Expand Up @@ -164,11 +171,11 @@ public void testListCheckpoint() {
final String checkpointPrefix = prefix + CHECKPOINT_PATH;

final BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
final BlobItem blobItem = getCheckpointBlobItem("230", "1", checkpointPrefix + "0"); // valid blob
final BlobItem blobItem = getCheckpointBlobItem("230", "1", checkpointPrefix + "0", "35"); // valid blob
final BlobItem blobItem2 = new BlobItem().setName(checkpointPrefix + "1"); // valid blob but not a valid checkpoint.
final BlobItem blobItem3 = getCheckpointBlobItem("233", "3", prefix + "1"); // invalid name
final PagedFlux<BlobItem> response
= new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders, BlobItem>(null, 200, null,
final BlobItem blobItem3 = getCheckpointBlobItem("233", "3", prefix + "1", null); // invalid name
final PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(
new PagedResponseBase<HttpHeaders, BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2, blobItem3), null, null)));

when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenAnswer(invocation -> {
Expand All @@ -192,6 +199,7 @@ public void testListCheckpoint() {
assertEquals(consumerGroup, checkpoint.getConsumerGroup());
assertEquals(1L, checkpoint.getSequenceNumber());
assertEquals(230L, checkpoint.getOffset());
assertEquals(35, checkpoint.getReplicationSegment());
})
.verifyComplete();
}
Expand Down Expand Up @@ -227,25 +235,41 @@ public void testUpdateCheckpoint() {
.setEventHubName(eventHubName)
.setConsumerGroup(consumerGroup)
.setPartitionId(partitionId)
.setReplicationSegment(16)
.setSequenceNumber(2L)
.setOffset(100L);

final BlobItem blobItem = getCheckpointBlobItem("230", "1", blobName);
final PagedFlux<BlobItem> response
= new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders, BlobItem>(null, 200, null,
final BlobItem blobItem = getCheckpointBlobItem("230", "1", blobName, "25");
final PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(
new PagedResponseBase<HttpHeaders, BlobItem>(null, 200, null,
Collections.singletonList(blobItem), null, null)));

when(blobContainerAsyncClient.getBlobAsyncClient(blobName)).thenReturn(blobAsyncClient);
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blobAsyncClient.exists()).thenReturn(Mono.just(true));

when(blobAsyncClient.setMetadata(ArgumentMatchers.<Map<String, String>>any())).thenReturn(Mono.empty());
when(blobAsyncClient.setMetadata(ArgumentMatchers.any())).thenReturn(Mono.empty());

final BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);

// Act & Assert
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint))
.verifyComplete();

verify(blobAsyncClient).setMetadata(metadataArgumentCaptor.capture());

final Map<String, String> actual = metadataArgumentCaptor.getValue();

assertEquals(3, actual.size());

assertTrue(actual.containsKey(REPLICATION_SEGMENT));
assertEquals(String.valueOf(checkpoint.getReplicationSegment()), actual.get(REPLICATION_SEGMENT));

assertTrue(actual.containsKey(OFFSET));
assertEquals(String.valueOf(checkpoint.getOffset()), actual.get(OFFSET));

assertTrue(actual.containsKey(SEQUENCE_NUMBER));
assertEquals(String.valueOf(checkpoint.getSequenceNumber()), actual.get(SEQUENCE_NUMBER));
}

/**
Expand All @@ -271,15 +295,16 @@ public void testUpdateCheckpointForNewPartition() {
.setConsumerGroup("cg")
.setPartitionId("0")
.setSequenceNumber(2L)
.setOffset(100L);
.setOffset(100L)
.setReplicationSegment(15);
final String legacyPrefix = getLegacyPrefix(checkpoint.getFullyQualifiedNamespace(),
checkpoint.getEventHubName(), checkpoint.getConsumerGroup());
final String blobName = legacyPrefix + CHECKPOINT_PATH + checkpoint.getPartitionId();

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HttpHeaderName.ETAG, "etag2");

BlobItem blobItem = getCheckpointBlobItem("230", "1", blobName);
BlobItem blobItem = getCheckpointBlobItem("230", "1", blobName, "20");

PagedFlux<BlobItem> response
= new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders, BlobItem>(null, 200, null,
Expand All @@ -291,12 +316,31 @@ public void testUpdateCheckpointForNewPartition() {
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blobAsyncClient.exists()).thenReturn(Mono.just(false));

when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.<Flux<ByteBuffer>>any(), eq(0L), isNull(),
anyMap(), isNull(), isNull(), isNull()))
.thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null)));
when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.any(), eq(0L),
isNull(), anyMap(), isNull(), isNull(), isNull()))
.thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null)));

BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();

// Act & Assert
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint))
.verifyComplete();

verify(blockBlobAsyncClient).uploadWithResponse(any(), eq(0L), isNull(),
metadataArgumentCaptor.capture(), isNull(), isNull(), isNull());

final Map<String, String> actual = metadataArgumentCaptor.getValue();

assertEquals(3, actual.size());

assertTrue(actual.containsKey(REPLICATION_SEGMENT));
assertEquals(String.valueOf(checkpoint.getReplicationSegment()), actual.get(REPLICATION_SEGMENT));

assertTrue(actual.containsKey(OFFSET));
assertEquals(String.valueOf(checkpoint.getOffset()), actual.get(OFFSET));

assertTrue(actual.containsKey(SEQUENCE_NUMBER));
assertEquals(String.valueOf(checkpoint.getSequenceNumber()), actual.get(SEQUENCE_NUMBER));
}

/**
Expand Down Expand Up @@ -443,11 +487,20 @@ private static BlobItem getOwnershipBlobItem(String owner, String etag, String b
return new BlobItem().setName(blobName).setMetadata(metadata).setProperties(properties);
}

private static BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) {
Map<String, String> metadata = new HashMap<>();
private static BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName,
String replicationSegment) {

final Map<String, String> metadata = new HashMap<>();
metadata.put(SEQUENCE_NUMBER, sequenceNumber);
metadata.put(OFFSET, offset);
return new BlobItem().setName(blobName).setMetadata(metadata);

if (replicationSegment != null) {
metadata.put(REPLICATION_SEGMENT, replicationSegment);
}

return new BlobItem()
.setName(blobName)
.setMetadata(metadata);
}

private static String getLegacyPrefix(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
Expand Down
7 changes: 6 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

### Features Added

Setting the v2 stack as the default. ([43725](https://github.com/Azure/azure-sdk-for-java/pull/43725))
- Setting the v2 stack as the default. ([43725](https://github.com/Azure/azure-sdk-for-java/pull/43725)).
- Added support geo-replication feature.
- Added `Checkpoint.getReplicationSegment()`.
- Added `getBeginningReplicationSegment()` and `getLastEnqueuedReplicationSegment()` to `PartitionProperties`.
- Added `LastEnqueuedEventProperties.getReplicationSegment()`.
- Added overloads to `EventPosition.fromSequenceNumber` that takes replication segment.

### Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.9.12</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>2.10.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,24 @@ public EventData setBodyAsBinaryData(BinaryData binaryData) {
*
* @return The offset within the Event Hub partition of the received event. {@code null} if the {@link EventData}
* was not received from Event Hubs service.
* @deprecated This value is obsolete and should no longer be used. Please use {@link #getOffsetString()} instead.
*/
@Deprecated
public Long getOffset() {
return systemProperties.getOffset();
}

/**
* Gets the offset of the event when it was received from the associated Event Hub partition. This is only present
* on a <b>received</b> {@link EventData}.
*
* @return The offset within the Event Hub partition of the received event. {@code null} if the {@link EventData}
* was not received from Event Hubs service.
*/
public String getOffsetString() {
return systemProperties.getOffsetString();
}

/**
* Gets the partition hashing key if it was set when originally publishing the event. If it exists, this value was
* used to compute a hash to select a partition to send the message to. This is only present on a <b>received</b>
Expand Down Expand Up @@ -397,6 +410,15 @@ public EventData setMessageId(String messageId) {
return this;
}

/**
* Gets the replication segment for the event.
*
* @return The replication segment. -1 or null if geo-disaster recovery is not enabled.
*/
public Integer getReplicationSegment() {
return systemProperties.getReplicationSegment();
}

/**
* True if the object is an {@link EventData} and the binary contents of {@link #getBody()} are equal.
*/
Expand Down
Loading
Loading