Skip to content

Commit

Permalink
fix(txmgr): ErrOffsetsLoadInProgress is retriable
Browse files Browse the repository at this point in the history
Also update the errors.go message to match
Errors.COORDINATOR_LOAD_IN_PROGRESS from Kafka

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Nov 2, 2023
1 parent 2e077cf commit afc088e
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 96 deletions.
186 changes: 93 additions & 93 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,98 +173,98 @@ type KError int16

// Numeric error codes returned by the Kafka server.
const (
ErrNoError KError = 0
ErrUnknown KError = -1
ErrOffsetOutOfRange KError = 1
ErrInvalidMessage KError = 2
ErrUnknownTopicOrPartition KError = 3
ErrInvalidMessageSize KError = 4
ErrLeaderNotAvailable KError = 5
ErrNotLeaderForPartition KError = 6
ErrRequestTimedOut KError = 7
ErrBrokerNotAvailable KError = 8
ErrReplicaNotAvailable KError = 9
ErrMessageSizeTooLarge KError = 10
ErrStaleControllerEpochCode KError = 11
ErrOffsetMetadataTooLarge KError = 12
ErrNetworkException KError = 13
ErrOffsetsLoadInProgress KError = 14
ErrConsumerCoordinatorNotAvailable KError = 15
ErrNotCoordinatorForConsumer KError = 16
ErrInvalidTopic KError = 17
ErrMessageSetSizeTooLarge KError = 18
ErrNotEnoughReplicas KError = 19
ErrNotEnoughReplicasAfterAppend KError = 20
ErrInvalidRequiredAcks KError = 21
ErrIllegalGeneration KError = 22
ErrInconsistentGroupProtocol KError = 23
ErrInvalidGroupId KError = 24
ErrUnknownMemberId KError = 25
ErrInvalidSessionTimeout KError = 26
ErrRebalanceInProgress KError = 27
ErrInvalidCommitOffsetSize KError = 28
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
ErrInvalidTimestamp KError = 32
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
ErrTopicAlreadyExists KError = 36
ErrInvalidPartitions KError = 37
ErrInvalidReplicationFactor KError = 38
ErrInvalidReplicaAssignment KError = 39
ErrInvalidConfig KError = 40
ErrNotController KError = 41
ErrInvalidRequest KError = 42
ErrUnsupportedForMessageFormat KError = 43
ErrPolicyViolation KError = 44
ErrOutOfOrderSequenceNumber KError = 45
ErrDuplicateSequenceNumber KError = 46
ErrInvalidProducerEpoch KError = 47
ErrInvalidTxnState KError = 48
ErrInvalidProducerIDMapping KError = 49
ErrInvalidTransactionTimeout KError = 50
ErrConcurrentTransactions KError = 51
ErrTransactionCoordinatorFenced KError = 52
ErrTransactionalIDAuthorizationFailed KError = 53
ErrSecurityDisabled KError = 54
ErrOperationNotAttempted KError = 55
ErrKafkaStorageError KError = 56
ErrLogDirNotFound KError = 57
ErrSASLAuthenticationFailed KError = 58
ErrUnknownProducerID KError = 59
ErrReassignmentInProgress KError = 60
ErrDelegationTokenAuthDisabled KError = 61
ErrDelegationTokenNotFound KError = 62
ErrDelegationTokenOwnerMismatch KError = 63
ErrDelegationTokenRequestNotAllowed KError = 64
ErrDelegationTokenAuthorizationFailed KError = 65
ErrDelegationTokenExpired KError = 66
ErrInvalidPrincipalType KError = 67
ErrNonEmptyGroup KError = 68
ErrGroupIDNotFound KError = 69
ErrFetchSessionIDNotFound KError = 70
ErrInvalidFetchSessionEpoch KError = 71
ErrListenerNotFound KError = 72
ErrTopicDeletionDisabled KError = 73
ErrFencedLeaderEpoch KError = 74
ErrUnknownLeaderEpoch KError = 75
ErrUnsupportedCompressionType KError = 76
ErrStaleBrokerEpoch KError = 77
ErrOffsetNotAvailable KError = 78
ErrMemberIdRequired KError = 79
ErrPreferredLeaderNotAvailable KError = 80
ErrGroupMaxSizeReached KError = 81
ErrFencedInstancedId KError = 82
ErrEligibleLeadersNotAvailable KError = 83
ErrElectionNotNeeded KError = 84
ErrNoReassignmentInProgress KError = 85
ErrGroupSubscribedToTopic KError = 86
ErrInvalidRecord KError = 87
ErrUnstableOffsetCommit KError = 88
ErrThrottlingQuotaExceeded KError = 89
ErrProducerFenced KError = 90
ErrUnknown KError = -1 // Errors.NONE
ErrNoError KError = 0 // Errors.UNKNOWN_SERVER_ERROR
ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE
ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE
ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION
ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE
ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE
ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER
ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT
ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE
ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE
ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE
ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH
ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION
ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR
ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE
ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS
ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS
ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION
ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID
ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID
ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT
ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS
ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP
ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE
ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION
ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS
ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS
ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR
ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG
ErrNotController KError = 41 // Errors.NOT_CONTROLLER
ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST
ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION
ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH
ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE
ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS
ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED
ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR
ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND
ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID
ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP
ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND
ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND
ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED
ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH
ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH
ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE
ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED
ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID
ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED
ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD
ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT
ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED
ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED
)

func (err KError) Error() string {
Expand Down Expand Up @@ -302,7 +302,7 @@ func (err KError) Error() string {
case ErrNetworkException:
return "kafka server: The server disconnected before a response was received"
case ErrOffsetsLoadInProgress:
return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition"
return "kafka server: The coordinator is still loading offsets and cannot currently process requests"
case ErrConsumerCoordinatorNotAvailable:
return "kafka server: Offset's topic has not yet been created"
case ErrNotCoordinatorForConsumer:
Expand Down
40 changes: 40 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,3 +1467,43 @@ func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeade
}
return res
}

// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
type MockInitProducerIDResponse struct {
producerID int64
producerEpoch int16
err KError
t TestReporter
}

func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
return &MockInitProducerIDResponse{
t: t,
}
}

func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
m.producerID = int64(id)
return m
}

func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
m.producerEpoch = int16(epoch)
return m
}

func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
m.err = err
return m
}

func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*InitProducerIDRequest)
res := &InitProducerIDResponse{
Version: req.Version,
Err: m.err,
ProducerID: m.producerID,
ProducerEpoch: m.producerEpoch,
}
return res
}
5 changes: 2 additions & 3 deletions transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,8 @@ func (t *transactionManager) initProducerId() (int64, int16, error) {
return response.ProducerID, response.ProducerEpoch, false, nil
}
switch response.Err {
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
// Retriable errors
case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
Expand Down
47 changes: 47 additions & 0 deletions transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,53 @@ func TestTxnmgrInitProducerIdTxn(t *testing.T) {
require.Equal(t, ProducerTxnFlagReady, txmng.status)
}

// TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress
func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) {
config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.Transaction.ID = "txid-group"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

broker := NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID()),
"FindCoordinatorRequest": NewMockSequence(
NewMockFindCoordinatorResponse(t).
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
NewMockFindCoordinatorResponse(t).
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorTransaction, "txid-group", broker),
),
"InitProducerIDRequest": NewMockSequence(
NewMockInitProducerIDResponse(t).
SetError(ErrOffsetsLoadInProgress),
NewMockInitProducerIDResponse(t).
SetError(ErrOffsetsLoadInProgress),
NewMockInitProducerIDResponse(t).
SetProducerID(1).
SetProducerEpoch(0),
),
})

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)
defer client.Close()

txmng, err := newTransactionManager(config, client)
require.NoError(t, err)

require.Equal(t, int64(1), txmng.producerID)
require.Equal(t, int16(0), txmng.producerEpoch)
require.Equal(t, ProducerTxnFlagReady, txmng.status)
}

func TestMaybeAddPartitionToCurrentTxn(t *testing.T) {
type testCase struct {
initialFlags ProducerTxnStatusFlag
Expand Down

0 comments on commit afc088e

Please sign in to comment.