diff --git a/adapter/config/types.go b/adapter/config/types.go index 7ebabe44b1..3e3ad0cf94 100644 --- a/adapter/config/types.go +++ b/adapter/config/types.go @@ -543,6 +543,7 @@ type controlPlane struct { SyncApisOnStartUp bool SendRevisionUpdate bool EnvironmentLabels []string + ASBDataplaneTopics []ASBDataplaneTopic `toml:"asbDataplaneTopics"` DynamicEnvironments dynamicEnvironments RetryInterval time.Duration SkipSSLVerification bool @@ -552,6 +553,15 @@ type controlPlane struct { InitialFetch initialFetch } +type ASBDataplaneTopic struct { + Type string `toml:"type"` + TopicName string `toml:"topicName"` + ConnectionString string `toml:"connectionString"` + AmqpOverWebsocketsEnabled bool + ReconnectInterval time.Duration + ReconnectRetryCount int +} + type dynamicEnvironments struct { Enabled bool DataPlaneID string diff --git a/adapter/internal/messaging/azure_listener.go b/adapter/internal/messaging/azure_listener.go index 348ca26ee7..f90a63f981 100644 --- a/adapter/internal/messaging/azure_listener.go +++ b/adapter/internal/messaging/azure_listener.go @@ -23,6 +23,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "net" "nhooyr.io/websocket" + "os" + "strconv" "time" "github.com/wso2/product-microgateway/adapter/config" @@ -34,16 +36,89 @@ import ( const ( componentName = "adapter" subscriptionIdleTimeDuration = "P0Y0M3DT0H0M0S" + notification = "notification" + tokenRevocation = "tokenRevocation" + stepQuotaThreshold = "thresholdEvent" + stepQuotaReset = "billingCycleResetEvent" + organizationPurge = "organizationPurge" ) +var topicNames = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset} + +const orgPurgeEnabled = "ORG_PURGE_ENABLED" + +func init() { + // Temporarily disable reacting organization Purge + orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled)) + + if envParseErr == nil { + if orgPurgeEnabled { + topicNames = append(topicNames, organizationPurge) + } + } +} + // InitiateAndProcessEvents to pass event consumption func InitiateAndProcessEvents(config *config.Config) { - var err error - var reconnectRetryCount = config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount - var reconnectInterval = config.ControlPlane.BrokerConnectionParameters.ReconnectInterval + if len(config.ControlPlane.ASBDataplaneTopics) > 0 { + for _, topic := range config.ControlPlane.ASBDataplaneTopics { + subscription, err := msg.InitiateBrokerConnectionAndValidate( + topic.ConnectionString, + topic.TopicName, + getAmqpClientOptions(config), + componentName, + topic.ReconnectRetryCount, + topic.ReconnectInterval*time.Millisecond, + subscriptionIdleTimeDuration) + if err != nil { + logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic.TopicName, err) + health.SetControlPlaneBrokerStatus(false) + return + } + msg.InitiateConsumer(subscription, topic.Type) + startChannelConsumer(topic.Type) + logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic.TopicName) + } + health.SetControlPlaneBrokerStatus(true) + } else { + for _, topic := range topicNames { + connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0] + reconnectRetryCount := config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount + reconnectInterval := config.ControlPlane.BrokerConnectionParameters.ReconnectInterval + + subscription, err := msg.InitiateBrokerConnectionAndValidate( + connectionString, + topic, + getAmqpClientOptions(config), + componentName, + reconnectRetryCount, + reconnectInterval*time.Millisecond, + subscriptionIdleTimeDuration) + if err != nil { + logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic, err) + health.SetControlPlaneBrokerStatus(false) + return + } + msg.InitiateConsumer(subscription, topic) + startChannelConsumer(topic) + logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic) + } + health.SetControlPlaneBrokerStatus(true) + } +} + +func startChannelConsumer(consumerType string) { + switch consumerType { + case notification: + go handleAzureNotification() + case tokenRevocation: + go handleAzureTokenRevocation() + case organizationPurge: + go handleAzureOrganizationPurge() + } +} - connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0] - var clientOpts *azservicebus.ClientOptions +func getAmqpClientOptions(config *config.Config) *azservicebus.ClientOptions { if config.ControlPlane.BrokerConnectionParameters.AmqpOverWebsocketsEnabled { logger.LoggerMgw.Info("AMQP over Websockets is enabled. Initiating brokers with AMQP over Websockets.") newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) { @@ -54,20 +129,9 @@ func InitiateAndProcessEvents(config *config.Config) { } return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil } - clientOpts = &azservicebus.ClientOptions{ + return &azservicebus.ClientOptions{ NewWebSocketConn: newWebSocketConnFn, } } - - subscriptionMetaDataList, err := msg.InitiateBrokerConnectionAndValidate(connectionString, clientOpts, componentName, - reconnectRetryCount, reconnectInterval*time.Millisecond, subscriptionIdleTimeDuration) - health.SetControlPlaneBrokerStatus(err == nil) - if err == nil { - logger.LoggerMgw.Info("Service bus meta data successfully initialized.") - msg.InitiateConsumers(connectionString, clientOpts, subscriptionMetaDataList, reconnectInterval*time.Millisecond) - go handleAzureNotification() - go handleAzureTokenRevocation() - go handleAzureOrganizationPurge() - } - + return nil } diff --git a/adapter/pkg/messaging/azure_connection.go b/adapter/pkg/messaging/azure_connection.go index 950ad9bc30..8031e6e25c 100644 --- a/adapter/pkg/messaging/azure_connection.go +++ b/adapter/pkg/messaging/azure_connection.go @@ -21,7 +21,7 @@ package messaging import ( "context" "errors" - "os" + "fmt" "regexp" "strconv" "time" @@ -29,19 +29,21 @@ import ( asb "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" "github.com/google/uuid" - "github.com/sirupsen/logrus" logger "github.com/wso2/product-microgateway/adapter/pkg/loggers" ) -// TODO: (erandi) when refactoring, refactor organization purge flow as well -var bindingKeys = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset} - -const orgPurgeEnabled = "ORG_PURGE_ENABLED" - -// Subscription stores the meta data of a specific subscription +// Subscription stores the metadata of a specific subscription +// TopicName: the topic name of the subscription +// SubscriptionName: the name of the subscription +// ConnectionString: the connection string of the service bus +// ClientOptions: the client options for initiating the client +// ReconnectInterval: the interval to wait before reconnecting type Subscription struct { - topicName string - subscriptionName string + TopicName string + SubscriptionName string + ConnectionString string + ClientOptions *asb.ClientOptions + ReconnectInterval time.Duration } var ( @@ -63,107 +65,77 @@ func init() { AzureStepQuotaThresholdChannel = make(chan []byte) AzureStepQuotaResetChannel = make(chan []byte) AzureOrganizationPurgeChannel = make(chan []byte) - - // Temporarily disable reacting organization Purge - orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled)) - - if envParseErr == nil { - if orgPurgeEnabled { - bindingKeys = append(bindingKeys, organizationPurge) - } - } } // InitiateBrokerConnectionAndValidate to initiate connection and validate azure service bus constructs to // further process -func InitiateBrokerConnectionAndValidate(connectionString string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int, - reconnectInterval time.Duration, subscriptionIdleTimeDuration string) ([]Subscription, error) { - subscriptionMetaDataList := make([]Subscription, 0) +func InitiateBrokerConnectionAndValidate(connectionString string, topic string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int, + reconnectInterval time.Duration, subscriptionIdleTimeDuration string) (*Subscription, error) { subProps := &admin.SubscriptionProperties{ AutoDeleteOnIdle: &subscriptionIdleTimeDuration, } _, err := asb.NewClientFromConnectionString(connectionString, clientOptions) if err == nil { - if logger.LoggerMsg.IsLevelEnabled(logrus.DebugLevel) { - logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString)) - } + logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString)) for j := 0; j < reconnectRetryCount || reconnectRetryCount == -1; j++ { - err = nil - subscriptionMetaDataList, err = retrieveSubscriptionMetadata(subscriptionMetaDataList, - connectionString, componentName, subProps) + sub, err := RetrieveSubscriptionMetadataForTopic(connectionString, topic, + clientOptions, componentName, subProps, reconnectInterval) if err != nil { logError(reconnectRetryCount, reconnectInterval, err) - subscriptionMetaDataList = nil time.Sleep(reconnectInterval) continue } - return subscriptionMetaDataList, err - } - if err != nil { - logger.LoggerMsg.Errorf("%v. Retry attempted %d times.", err, reconnectRetryCount) - return subscriptionMetaDataList, err + return sub, err } - } else { - // any error which comes to this point is because the connection url is not up to the expected format - // hence not retrying - logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v", - connectionString, err) + return nil, fmt.Errorf("failed to create subscription for topic %s", topic) } - return subscriptionMetaDataList, err + logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v", + maskSharedAccessKey(connectionString), err) + return nil, err } -// InitiateConsumers to pass event consumption -func InitiateConsumers(connectionString string, clientOptions *asb.ClientOptions, subscriptionMetaDataList []Subscription, reconnectInterval time.Duration) { - for _, subscriptionMetaData := range subscriptionMetaDataList { - go func(subscriptionMetaData Subscription) { - startBrokerConsumer(connectionString, clientOptions, subscriptionMetaData, reconnectInterval) - }(subscriptionMetaData) - } +// InitiateConsumer to start the broker consumer in a separate go routine +func InitiateConsumer(sub *Subscription, consumerType string) { + go startBrokerConsumer(sub, consumerType) } -func retrieveSubscriptionMetadata(metaDataList []Subscription, connectionString string, componentName string, - opts *admin.SubscriptionProperties) ([]Subscription, error) { - parentContext := context.Background() +func RetrieveSubscriptionMetadataForTopic(connectionString string, topicName string, clientOptions *asb.ClientOptions, + componentName string, opts *admin.SubscriptionProperties, reconnectInterval time.Duration) (*Subscription, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() adminClient, clientErr := admin.NewClientFromConnectionString(connectionString, nil) if clientErr != nil { logger.LoggerMsg.Errorf("Error occurred while trying to create ASB admin client using the connection url %s", connectionString) return nil, clientErr } + // Todo (vajira) : move this comment to parent method + // we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after + // idle for three days - for _, key := range bindingKeys { - var errorValue error - subscriptionMetaData := Subscription{ - topicName: key, - subscriptionName: "", - } - // we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after - // idle for three days - uniqueID := uuid.New() - - // in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and - // underscores (_), up to 50 characters. Subscription names are also case-insensitive. - var subscriptionName = componentName + "_" + uniqueID.String() + "_sub" - var subscriptionCreationError error - func() { - ctx, cancel := context.WithCancel(parentContext) - defer cancel() - _, subscriptionCreationError = adminClient.CreateSubscription(ctx, key, subscriptionName, &admin.CreateSubscriptionOptions{ - Properties: opts, - }) - }() - if subscriptionCreationError != nil { - errorValue = errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " + - key + "." + subscriptionCreationError.Error()) - return metaDataList, errorValue - } - logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName) - subscriptionMetaData.subscriptionName = subscriptionName - subscriptionMetaData.topicName = key - metaDataList = append(metaDataList, subscriptionMetaData) + // in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and + // underscores (_), up to 50 characters. Subscription names are also case-insensitive. + + subscriptionName := fmt.Sprintf("%s_%s_sub", componentName, uuid.New().String()) + _, err := adminClient.CreateSubscription(ctx, topicName, subscriptionName, &admin.CreateSubscriptionOptions{ + Properties: opts, + }) + + if err != nil { + return nil, errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " + + topicName + "." + err.Error()) } - return metaDataList, nil + + logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName) + + return &Subscription{ + TopicName: topicName, + SubscriptionName: subscriptionName, + ConnectionString: connectionString, + ClientOptions: clientOptions, + ReconnectInterval: reconnectInterval, + }, nil } func logError(reconnectRetryCount int, reconnectInterval time.Duration, errVal error) { diff --git a/adapter/pkg/messaging/azure_listener.go b/adapter/pkg/messaging/azure_listener.go index 673022eb7c..8b384401c0 100644 --- a/adapter/pkg/messaging/azure_listener.go +++ b/adapter/pkg/messaging/azure_listener.go @@ -27,27 +27,27 @@ import ( logger "github.com/wso2/product-microgateway/adapter/pkg/loggers" ) -func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptions, sub Subscription, reconnectInterval time.Duration) { - var topic = sub.topicName - var subName = sub.subscriptionName +func startBrokerConsumer(sub *Subscription, consumerType string) { + var topic = sub.TopicName + var subName = sub.SubscriptionName dataChannel := make(chan []byte) - if strings.EqualFold(topic, notification) { + if strings.EqualFold(consumerType, notification) { dataChannel = AzureNotificationChannel - } else if strings.EqualFold(topic, tokenRevocation) { + } else if strings.EqualFold(consumerType, tokenRevocation) { dataChannel = AzureRevokedTokenChannel - } else if strings.EqualFold(topic, stepQuotaThreshold) { + } else if strings.EqualFold(consumerType, stepQuotaThreshold) { dataChannel = AzureStepQuotaThresholdChannel - } else if strings.EqualFold(topic, stepQuotaReset) { + } else if strings.EqualFold(consumerType, stepQuotaReset) { dataChannel = AzureStepQuotaResetChannel - } else if strings.EqualFold(topic, organizationPurge) { + } else if strings.EqualFold(consumerType, organizationPurge) { dataChannel = AzureOrganizationPurgeChannel } parentContext := context.Background() for { // initializing the receiver client - subClient, err := asb.NewClientFromConnectionString(connectionString, clientOptions) + subClient, err := asb.NewClientFromConnectionString(sub.ConnectionString, sub.ClientOptions) if err != nil { logger.LoggerMsg.Errorf("Failed to create ASB client for %s , topic: %s. error: %v.", subName, topic, err) @@ -70,7 +70,7 @@ func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptio messages, err := receiver.ReceiveMessages(ctx, 10, nil) if err != nil { logger.LoggerMsg.Errorf("Failed to receive messages from ASB. Subscription: %s, topic: %s error: %v", subName, topic, err) - time.Sleep(reconnectInterval) + time.Sleep(sub.ReconnectInterval) continue } for _, message := range messages {