Skip to content

Commit

Permalink
[choreo] multi-tenanting ASB: Add topic wise ASB event listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
VajiraPrabuddhaka committed Oct 20, 2024
1 parent c7f4486 commit ddca93c
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 109 deletions.
10 changes: 10 additions & 0 deletions adapter/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ type controlPlane struct {
SyncApisOnStartUp bool
SendRevisionUpdate bool
EnvironmentLabels []string
ASBDataplaneTopics []ASBDataplaneTopic `toml:"asbDataplaneTopics"`
DynamicEnvironments dynamicEnvironments
RetryInterval time.Duration
SkipSSLVerification bool
Expand All @@ -552,6 +553,15 @@ type controlPlane struct {
InitialFetch initialFetch
}

type ASBDataplaneTopic struct {
Type string `toml:"type"`
TopicName string `toml:"topicName"`
ConnectionString string `toml:"connectionString"`
AmqpOverWebsocketsEnabled bool
ReconnectInterval time.Duration
ReconnectRetryCount int
}

type dynamicEnvironments struct {
Enabled bool
DataPlaneID string
Expand Down
100 changes: 82 additions & 18 deletions adapter/internal/messaging/azure_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"net"
"nhooyr.io/websocket"
"os"
"strconv"
"time"

"github.com/wso2/product-microgateway/adapter/config"
Expand All @@ -34,16 +36,89 @@ import (
const (
componentName = "adapter"
subscriptionIdleTimeDuration = "P0Y0M3DT0H0M0S"
notification = "notification"
tokenRevocation = "tokenRevocation"
stepQuotaThreshold = "thresholdEvent"
stepQuotaReset = "billingCycleResetEvent"
organizationPurge = "organizationPurge"
)

var topicNames = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset}

const orgPurgeEnabled = "ORG_PURGE_ENABLED"

func init() {
// Temporarily disable reacting organization Purge
orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled))

if envParseErr == nil {
if orgPurgeEnabled {
topicNames = append(topicNames, organizationPurge)
}
}
}

// InitiateAndProcessEvents to pass event consumption
func InitiateAndProcessEvents(config *config.Config) {
var err error
var reconnectRetryCount = config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount
var reconnectInterval = config.ControlPlane.BrokerConnectionParameters.ReconnectInterval
if len(config.ControlPlane.ASBDataplaneTopics) > 0 {
for _, topic := range config.ControlPlane.ASBDataplaneTopics {
subscription, err := msg.InitiateBrokerConnectionAndValidate(
topic.ConnectionString,
topic.TopicName,
getAmqpClientOptions(config),
componentName,
topic.ReconnectRetryCount,
topic.ReconnectInterval*time.Millisecond,
subscriptionIdleTimeDuration)
if err != nil {
logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic.TopicName, err)
health.SetControlPlaneBrokerStatus(false)
return
}
msg.InitiateConsumer(subscription, topic.Type)
startChannelConsumer(topic.Type)
logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic.TopicName)
}
health.SetControlPlaneBrokerStatus(true)
} else {
for _, topic := range topicNames {
connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0]
reconnectRetryCount := config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount
reconnectInterval := config.ControlPlane.BrokerConnectionParameters.ReconnectInterval

subscription, err := msg.InitiateBrokerConnectionAndValidate(
connectionString,
topic,
getAmqpClientOptions(config),
componentName,
reconnectRetryCount,
reconnectInterval*time.Millisecond,
subscriptionIdleTimeDuration)
if err != nil {
logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic, err)
health.SetControlPlaneBrokerStatus(false)
return
}
msg.InitiateConsumer(subscription, topic)
startChannelConsumer(topic)
logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic)
}
health.SetControlPlaneBrokerStatus(true)
}
}

func startChannelConsumer(consumerType string) {
switch consumerType {
case notification:
go handleAzureNotification()
case tokenRevocation:
go handleAzureTokenRevocation()
case organizationPurge:
go handleAzureOrganizationPurge()
}
}

connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0]
var clientOpts *azservicebus.ClientOptions
func getAmqpClientOptions(config *config.Config) *azservicebus.ClientOptions {
if config.ControlPlane.BrokerConnectionParameters.AmqpOverWebsocketsEnabled {
logger.LoggerMgw.Info("AMQP over Websockets is enabled. Initiating brokers with AMQP over Websockets.")
newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) {
Expand All @@ -54,20 +129,9 @@ func InitiateAndProcessEvents(config *config.Config) {
}
return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
}
clientOpts = &azservicebus.ClientOptions{
return &azservicebus.ClientOptions{
NewWebSocketConn: newWebSocketConnFn,
}
}

subscriptionMetaDataList, err := msg.InitiateBrokerConnectionAndValidate(connectionString, clientOpts, componentName,
reconnectRetryCount, reconnectInterval*time.Millisecond, subscriptionIdleTimeDuration)
health.SetControlPlaneBrokerStatus(err == nil)
if err == nil {
logger.LoggerMgw.Info("Service bus meta data successfully initialized.")
msg.InitiateConsumers(connectionString, clientOpts, subscriptionMetaDataList, reconnectInterval*time.Millisecond)
go handleAzureNotification()
go handleAzureTokenRevocation()
go handleAzureOrganizationPurge()
}

return nil
}
134 changes: 53 additions & 81 deletions adapter/pkg/messaging/azure_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,29 @@ package messaging
import (
"context"
"errors"
"os"
"fmt"
"regexp"
"strconv"
"time"

asb "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
logger "github.com/wso2/product-microgateway/adapter/pkg/loggers"
)

// TODO: (erandi) when refactoring, refactor organization purge flow as well
var bindingKeys = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset}

const orgPurgeEnabled = "ORG_PURGE_ENABLED"

// Subscription stores the meta data of a specific subscription
// Subscription stores the metadata of a specific subscription
// TopicName: the topic name of the subscription
// SubscriptionName: the name of the subscription
// ConnectionString: the connection string of the service bus
// ClientOptions: the client options for initiating the client
// ReconnectInterval: the interval to wait before reconnecting
type Subscription struct {
topicName string
subscriptionName string
TopicName string
SubscriptionName string
ConnectionString string
ClientOptions *asb.ClientOptions
ReconnectInterval time.Duration
}

var (
Expand All @@ -63,107 +65,77 @@ func init() {
AzureStepQuotaThresholdChannel = make(chan []byte)
AzureStepQuotaResetChannel = make(chan []byte)
AzureOrganizationPurgeChannel = make(chan []byte)

// Temporarily disable reacting organization Purge
orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled))

if envParseErr == nil {
if orgPurgeEnabled {
bindingKeys = append(bindingKeys, organizationPurge)
}
}
}

// InitiateBrokerConnectionAndValidate to initiate connection and validate azure service bus constructs to
// further process
func InitiateBrokerConnectionAndValidate(connectionString string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int,
reconnectInterval time.Duration, subscriptionIdleTimeDuration string) ([]Subscription, error) {
subscriptionMetaDataList := make([]Subscription, 0)
func InitiateBrokerConnectionAndValidate(connectionString string, topic string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int,
reconnectInterval time.Duration, subscriptionIdleTimeDuration string) (*Subscription, error) {
subProps := &admin.SubscriptionProperties{
AutoDeleteOnIdle: &subscriptionIdleTimeDuration,
}
_, err := asb.NewClientFromConnectionString(connectionString, clientOptions)

if err == nil {
if logger.LoggerMsg.IsLevelEnabled(logrus.DebugLevel) {
logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString))
}
logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString))

for j := 0; j < reconnectRetryCount || reconnectRetryCount == -1; j++ {
err = nil
subscriptionMetaDataList, err = retrieveSubscriptionMetadata(subscriptionMetaDataList,
connectionString, componentName, subProps)
sub, err := RetrieveSubscriptionMetadataForTopic(connectionString, topic,
clientOptions, componentName, subProps, reconnectInterval)
if err != nil {
logError(reconnectRetryCount, reconnectInterval, err)
subscriptionMetaDataList = nil
time.Sleep(reconnectInterval)
continue
}
return subscriptionMetaDataList, err
}
if err != nil {
logger.LoggerMsg.Errorf("%v. Retry attempted %d times.", err, reconnectRetryCount)
return subscriptionMetaDataList, err
return sub, err
}
} else {
// any error which comes to this point is because the connection url is not up to the expected format
// hence not retrying
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v",
connectionString, err)
return nil, fmt.Errorf("failed to create subscription for topic %s", topic)
}
return subscriptionMetaDataList, err
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v",
maskSharedAccessKey(connectionString), err)
return nil, err
}

// InitiateConsumers to pass event consumption
func InitiateConsumers(connectionString string, clientOptions *asb.ClientOptions, subscriptionMetaDataList []Subscription, reconnectInterval time.Duration) {
for _, subscriptionMetaData := range subscriptionMetaDataList {
go func(subscriptionMetaData Subscription) {
startBrokerConsumer(connectionString, clientOptions, subscriptionMetaData, reconnectInterval)
}(subscriptionMetaData)
}
// InitiateConsumer to start the broker consumer in a separate go routine
func InitiateConsumer(sub *Subscription, consumerType string) {
go startBrokerConsumer(sub, consumerType)
}

func retrieveSubscriptionMetadata(metaDataList []Subscription, connectionString string, componentName string,
opts *admin.SubscriptionProperties) ([]Subscription, error) {
parentContext := context.Background()
func RetrieveSubscriptionMetadataForTopic(connectionString string, topicName string, clientOptions *asb.ClientOptions,
componentName string, opts *admin.SubscriptionProperties, reconnectInterval time.Duration) (*Subscription, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
adminClient, clientErr := admin.NewClientFromConnectionString(connectionString, nil)
if clientErr != nil {
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB admin client using the connection url %s", connectionString)
return nil, clientErr
}
// Todo (vajira) : move this comment to parent method
// we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after
// idle for three days

for _, key := range bindingKeys {
var errorValue error
subscriptionMetaData := Subscription{
topicName: key,
subscriptionName: "",
}
// we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after
// idle for three days
uniqueID := uuid.New()

// in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and
// underscores (_), up to 50 characters. Subscription names are also case-insensitive.
var subscriptionName = componentName + "_" + uniqueID.String() + "_sub"
var subscriptionCreationError error
func() {
ctx, cancel := context.WithCancel(parentContext)
defer cancel()
_, subscriptionCreationError = adminClient.CreateSubscription(ctx, key, subscriptionName, &admin.CreateSubscriptionOptions{
Properties: opts,
})
}()
if subscriptionCreationError != nil {
errorValue = errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " +
key + "." + subscriptionCreationError.Error())
return metaDataList, errorValue
}
logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName)
subscriptionMetaData.subscriptionName = subscriptionName
subscriptionMetaData.topicName = key
metaDataList = append(metaDataList, subscriptionMetaData)
// in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and
// underscores (_), up to 50 characters. Subscription names are also case-insensitive.

subscriptionName := fmt.Sprintf("%s_%s_sub", componentName, uuid.New().String())
_, err := adminClient.CreateSubscription(ctx, topicName, subscriptionName, &admin.CreateSubscriptionOptions{
Properties: opts,
})

if err != nil {
return nil, errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " +
topicName + "." + err.Error())
}
return metaDataList, nil

logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName)

return &Subscription{
TopicName: topicName,
SubscriptionName: subscriptionName,
ConnectionString: connectionString,
ClientOptions: clientOptions,
ReconnectInterval: reconnectInterval,
}, nil
}

func logError(reconnectRetryCount int, reconnectInterval time.Duration, errVal error) {
Expand Down
20 changes: 10 additions & 10 deletions adapter/pkg/messaging/azure_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ import (
logger "github.com/wso2/product-microgateway/adapter/pkg/loggers"
)

func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptions, sub Subscription, reconnectInterval time.Duration) {
var topic = sub.topicName
var subName = sub.subscriptionName
func startBrokerConsumer(sub *Subscription, consumerType string) {
var topic = sub.TopicName
var subName = sub.SubscriptionName

dataChannel := make(chan []byte)
if strings.EqualFold(topic, notification) {
if strings.EqualFold(consumerType, notification) {
dataChannel = AzureNotificationChannel
} else if strings.EqualFold(topic, tokenRevocation) {
} else if strings.EqualFold(consumerType, tokenRevocation) {
dataChannel = AzureRevokedTokenChannel
} else if strings.EqualFold(topic, stepQuotaThreshold) {
} else if strings.EqualFold(consumerType, stepQuotaThreshold) {
dataChannel = AzureStepQuotaThresholdChannel
} else if strings.EqualFold(topic, stepQuotaReset) {
} else if strings.EqualFold(consumerType, stepQuotaReset) {
dataChannel = AzureStepQuotaResetChannel
} else if strings.EqualFold(topic, organizationPurge) {
} else if strings.EqualFold(consumerType, organizationPurge) {
dataChannel = AzureOrganizationPurgeChannel
}
parentContext := context.Background()

for {
// initializing the receiver client
subClient, err := asb.NewClientFromConnectionString(connectionString, clientOptions)
subClient, err := asb.NewClientFromConnectionString(sub.ConnectionString, sub.ClientOptions)
if err != nil {
logger.LoggerMsg.Errorf("Failed to create ASB client for %s , topic: %s. error: %v.",
subName, topic, err)
Expand All @@ -70,7 +70,7 @@ func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptio
messages, err := receiver.ReceiveMessages(ctx, 10, nil)
if err != nil {
logger.LoggerMsg.Errorf("Failed to receive messages from ASB. Subscription: %s, topic: %s error: %v", subName, topic, err)
time.Sleep(reconnectInterval)
time.Sleep(sub.ReconnectInterval)
continue
}
for _, message := range messages {
Expand Down

0 comments on commit ddca93c

Please sign in to comment.