Skip to content

Commit

Permalink
fix(admin): use named return on retryOnError func
Browse files Browse the repository at this point in the history
Use a named return for the defer func to inspect to avoid subtle bugs

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Jan 7, 2025
1 parent ece2706 commit be0589b
Showing 1 changed file with 30 additions and 34 deletions.
64 changes: 30 additions & 34 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,25 +1066,24 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
var response *OffsetFetchResponse
request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
defer func() {
if err != nil && isRetriableGroupCoordinatorError(err) {
_ = ca.client.RefreshCoordinator(group)
}
}()

coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

response, err = coordinator.FetchOffset(request)
if err != nil {
return err
}
if !errors.Is(response.Err, ErrNoError) {
err = response.Err
return err
return response.Err
}

return nil
Expand All @@ -1102,29 +1101,27 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa
},
}

return ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
defer func() {
if err != nil && isRetriableGroupCoordinatorError(err) {
_ = ca.client.RefreshCoordinator(group)
}
}()

coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

response, err = coordinator.DeleteOffsets(request)
if err != nil {
return err
}
if !errors.Is(response.ErrorCode, ErrNoError) {
err = response.ErrorCode
return err
return response.ErrorCode
}
if !errors.Is(response.Errors[topic][partition], ErrNoError) {
err = response.Errors[topic][partition]
return err
return response.Errors[topic][partition]
}

return nil
Expand All @@ -1140,18 +1137,18 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
request.Version = 1
}

return ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
defer func() {
if err != nil && isRetriableGroupCoordinatorError(err) {
_ = ca.client.RefreshCoordinator(group)
}
}()

coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

response, err = coordinator.DeleteGroups(request)
if err != nil {
return err
Expand All @@ -1163,8 +1160,7 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
}

if !errors.Is(groupErr, ErrNoError) {
err = groupErr
return err
return groupErr
}

return nil
Expand Down Expand Up @@ -1359,28 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanc
GroupInstanceId: &groupInstanceId,
})
}
err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
defer func() {
if err != nil && isRetriableGroupCoordinatorError(err) {
_ = ca.client.RefreshCoordinator(group)
}
}()

coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

response, err = coordinator.LeaveGroup(request)
if err != nil {
return err
}
if !errors.Is(response.Err, ErrNoError) {
err = response.Err
return err
return response.Err
}

return nil
})

return response, err
}

0 comments on commit be0589b

Please sign in to comment.