diff --git a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go index 1e29c520d..02a5b7e6c 100644 --- a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go +++ b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go @@ -275,6 +275,9 @@ type KafkaConfig struct { // (Deprecated) (Unused) Suffix string `json:"suffix,omitempty"` + + // Sets the replica count for ephem-msk mode for kafka connect + KafkaConnectReplicaCount int `json:"kafkaConnectReplicaCount,omitempty"` } // DatabaseMode details the mode of operation of the Clowder Database Provider diff --git a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml index eda968386..064475a38 100644 --- a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml +++ b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml @@ -316,6 +316,10 @@ spec: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/controllers/cloud.redhat.com/providers/kafka/msk.go b/controllers/cloud.redhat.com/providers/kafka/msk.go index 1bfda00c2..54edf0145 100644 --- a/controllers/cloud.redhat.com/providers/kafka/msk.go +++ b/controllers/cloud.redhat.com/providers/kafka/msk.go @@ -3,6 +3,7 @@ package kafka import ( "encoding/json" "fmt" + "strconv" "strings" crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" @@ -199,14 +200,19 @@ type genericConfig map[string]string func (s mskProvider) connectConfig(config *apiextensions.JSON) error { + replicas := 3 + if s.Env.Spec.Providers.Kafka.KafkaConnectReplicaCount != 0 { + replicas = s.Env.Spec.Providers.Kafka.KafkaConnectReplicaCount + } + connectConfig := genericConfig{ - "config.storage.replication.factor": "1", + "config.storage.replication.factor": strconv.Itoa(replicas), "config.storage.topic": fmt.Sprintf("%v-connect-cluster-configs", s.Env.Name), "connector.client.config.override.policy": "All", "group.id": "connect-cluster", - "offset.storage.replication.factor": "1", + "offset.storage.replication.factor": strconv.Itoa(replicas), "offset.storage.topic": fmt.Sprintf("%v-connect-cluster-offsets", s.Env.Name), - "status.storage.replication.factor": "1", + "status.storage.replication.factor": strconv.Itoa(replicas), "status.storage.topic": fmt.Sprintf("%v-connect-cluster-status", s.Env.Name), } diff --git a/deploy-mutate.yml b/deploy-mutate.yml index f756a402d..e9568320f 100644 --- a/deploy-mutate.yml +++ b/deploy-mutate.yml @@ -6524,6 +6524,10 @@ objects: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/deploy.yml b/deploy.yml index 997677bf4..d0565e5c5 100644 --- a/deploy.yml +++ b/deploy.yml @@ -6524,6 +6524,10 @@ objects: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/docs/antora/modules/ROOT/pages/api_reference.adoc b/docs/antora/modules/ROOT/pages/api_reference.adoc index 2e362b546..bfe7aa544 100644 --- a/docs/antora/modules/ROOT/pages/api_reference.adoc +++ b/docs/antora/modules/ROOT/pages/api_reference.adoc @@ -1009,6 +1009,7 @@ KafkaConfig configures the Clowder provider controlling the creation of Kafka in | *`connectNamespace`* __string__ | (Deprecated) The namespace that the Kafka Connect cluster is expected to reside in. This is only used in (*_app-interface_*) and (*_operator_*) modes. | *`connectClusterName`* __string__ | (Deprecated) Defines the kafka connect cluster name that is used in this environment. | *`suffix`* __string__ | (Deprecated) (Unused) +| *`kafkaConnectReplicaCount`* __integer__ | Sets the replica count for ephem-msk mode for kafka connect |=== diff --git a/tests/kuttl/test-kafka-msk/03-pods.yaml b/tests/kuttl/test-kafka-msk/03-pods.yaml index c493fd061..3a857de3c 100644 --- a/tests/kuttl/test-kafka-msk/03-pods.yaml +++ b/tests/kuttl/test-kafka-msk/03-pods.yaml @@ -20,6 +20,7 @@ spec: namespace: test-kafka-msk-sec-source clusterAnnotation: test-kafka-msk topicNamespace: test-kafka-msk-sec-source + kafkaConnectReplicaCount: 1 db: mode: none logging: