diff --git a/.gitignore b/.gitignore index d8c947cd6..9f40d5d58 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .DS_Store config-dev-w-creds.yaml environments-dev-w-creds.yaml +.idea/ \ No newline at end of file diff --git a/api/go.mod b/api/go.mod index 01190f9da..7d1c7ea57 100644 --- a/api/go.mod +++ b/api/go.mod @@ -8,7 +8,7 @@ require ( github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20221025152940-c261df66a006 github.com/antihax/optional v1.0.0 github.com/caraml-dev/merlin v0.0.0 - github.com/caraml-dev/mlp v1.12.0 + github.com/caraml-dev/mlp v1.13.2 github.com/caraml-dev/turing/engines/experiment v0.0.0 github.com/caraml-dev/turing/engines/router v0.0.0 github.com/caraml-dev/universal-prediction-interface v0.3.6 @@ -66,6 +66,7 @@ require ( github.com/VividCortex/ewma v1.2.0 // indirect github.com/ajg/form v1.5.1 // indirect github.com/andybalholm/brotli v1.0.5 // indirect + github.com/avast/retry-go/v4 v4.6.0 // indirect github.com/aws/aws-sdk-go-v2 v1.30.6-0.20240906182417-827d25db0048 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect github.com/aws/aws-sdk-go-v2/config v1.17.8 // indirect @@ -267,7 +268,6 @@ replace ( github.com/caraml-dev/merlin => github.com/caraml-dev/merlin/api v0.0.0-20240313065547-6778bd14c119 github.com/caraml-dev/merlin-pyspark-app => github.com/caraml-dev/merlin/python/batch-predictor v0.0.0-20240313065547-6778bd14c119 - github.com/caraml-dev/mlp => github.com/deadlycoconuts/mlp v0.0.0-20240917090435-d94d92572eac github.com/caraml-dev/turing/engines/experiment => ../engines/experiment github.com/caraml-dev/turing/engines/router => ../engines/router diff --git a/api/go.sum b/api/go.sum index 8e8e0dc58..96902ecff 100644 --- a/api/go.sum +++ b/api/go.sum @@ -72,6 +72,8 @@ github.com/antihax/optional v1.0.0 h1:xK2lYat7ZLaVVcIuj82J8kIro4V6kDe0AUDFboUCwc github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= github.com/aws/aws-sdk-go-v2 v1.30.6-0.20240906182417-827d25db0048 h1:wXvkIvYQ3EPVO5MhCoEv2u5LDwfWp+kLTQMIGyyvi/0= @@ -125,6 +127,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/caraml-dev/merlin/api v0.0.0-20240313065547-6778bd14c119 h1:8zt/6B7ySOokLa7WDpjaybBOL8x5C35nbMIrKFMsZcg= github.com/caraml-dev/merlin/api v0.0.0-20240313065547-6778bd14c119/go.mod h1:b2MbvZaVUSJQaoF0AuEgwTVVYK8lPHCIvnYpfvnjkQY= +github.com/caraml-dev/mlp v1.13.2 h1:N3lk+ToQ281duZImQLTQ28uJtmoc9Zkxx1CR94rS15U= +github.com/caraml-dev/mlp v1.13.2/go.mod h1:9kPooDSYsVu5q/z2K4T9uu08RGyiFNbCAFnQVBMJxOk= github.com/caraml-dev/universal-prediction-interface v0.3.6 h1:G/D4aukfjLECl8armJqFy/R2+0u/f4AiurSFqAo33uQ= github.com/caraml-dev/universal-prediction-interface v0.3.6/go.mod h1:e0qmFOXQxx8HFg5ObYyQO3WVnrqsr5v5JApFmeF7eJo= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -170,8 +174,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deadlycoconuts/mlp v0.0.0-20240917090435-d94d92572eac h1:M6dR1d3O+xY4suQPumkbNO1Ie70M6WoYFaoW+cjKT+8= -github.com/deadlycoconuts/mlp v0.0.0-20240917090435-d94d92572eac/go.mod h1:9kPooDSYsVu5q/z2K4T9uu08RGyiFNbCAFnQVBMJxOk= github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM= github.com/dgraph-io/ristretto v0.0.1 h1:cJwdnj42uV8Jg4+KLrYovLiCgIfz9wtWm6E6KA+1tLs= github.com/dgraph-io/ristretto v0.0.1/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE= diff --git a/api/turing/api/base_controller.go b/api/turing/api/base_controller.go index e6da96041..01cc7d95d 100644 --- a/api/turing/api/base_controller.go +++ b/api/turing/api/base_controller.go @@ -5,6 +5,8 @@ import ( val "github.com/go-playground/validator/v10" "github.com/gorilla/schema" + "github.com/caraml-dev/turing/api/turing/webhook" + "github.com/caraml-dev/turing/api/turing/models" ) @@ -16,19 +18,21 @@ type Controller interface { // BaseController implements common methods that may be shared by all API controllers type BaseController struct { *AppContext - decoder *schema.Decoder - validator *val.Validate + decoder *schema.Decoder + validator *val.Validate + webhookClient webhook.Client } // NewBaseController returns a new instance of BaseController -func NewBaseController(ctx *AppContext, validator *val.Validate) BaseController { +func NewBaseController(ctx *AppContext, validator *val.Validate, webhookClient webhook.Client) BaseController { decoder := schema.NewDecoder() decoder.IgnoreUnknownKeys(true) return BaseController{ - AppContext: ctx, - decoder: decoder, - validator: validator, + AppContext: ctx, + decoder: decoder, + validator: validator, + webhookClient: webhookClient, } } diff --git a/api/turing/api/deployment_controller.go b/api/turing/api/deployment_controller.go index 4e6cb89ca..b5022e44e 100644 --- a/api/turing/api/deployment_controller.go +++ b/api/turing/api/deployment_controller.go @@ -1,6 +1,7 @@ package api import ( + "context" "encoding/json" "errors" "fmt" @@ -25,6 +26,8 @@ func (c RouterDeploymentController) deployOrRollbackRouter( router *models.Router, routerVersion *models.RouterVersion, ) error { + ctx := context.Background() + // Get the router environment environment, err := c.MLPService.GetEnvironment(router.EnvironmentName) if err != nil { @@ -48,7 +51,7 @@ func (c RouterDeploymentController) deployOrRollbackRouter( "starting deployment for router %s version %d", router.Name, routerVersion.Version)) // Deploy the given router version - endpoint, err := c.deployRouterVersion(project, environment, routerVersion, eventsCh) + endpoint, err := c.deployRouterVersion(ctx, project, environment, routerVersion, eventsCh) // Start accumulating non-critical errors errorStrings := make([]string, 0) @@ -93,6 +96,7 @@ func (c RouterDeploymentController) deployOrRollbackRouter( } err = errors.New(strings.Join(errorStrings, ". ")) + return err } @@ -149,6 +153,7 @@ func (c RouterDeploymentController) writeDeploymentEvents( // (current version reference, status, endpoint, etc.) are not in the scope of this method. // This method returns the new router endpoint (if successful) and any error. func (c RouterDeploymentController) deployRouterVersion( + _ context.Context, project *mlp.Project, environment *merlin.Environment, routerVersion *models.RouterVersion, @@ -273,6 +278,7 @@ func (c RouterDeploymentController) deployRouterVersion( // Deploy succeeded - update version's status to deployed and return endpoint routerVersion.Status = models.RouterVersionStatusDeployed _, err = c.RouterVersionsService.Save(routerVersion) + return endpoint, err } diff --git a/api/turing/api/deployment_controller_test.go b/api/turing/api/deployment_controller_test.go index 9dd8419eb..07277274d 100644 --- a/api/turing/api/deployment_controller_test.go +++ b/api/turing/api/deployment_controller_test.go @@ -1,6 +1,7 @@ package api import ( + "context" "database/sql" "encoding/json" "errors" @@ -21,6 +22,8 @@ import ( ) func TestDeployVersionSuccess(t *testing.T) { + ctx := context.TODO() + testEnv := "test-env" environment := &merlin.Environment{Name: testEnv} project := &mlp.Project{ID: 1, Name: "test-project"} @@ -191,7 +194,7 @@ func TestDeployVersionSuccess(t *testing.T) { // Run deploy and test that the router version's status is deployed and the endpoint // returned by deploy version is expected. - endpoint, err := ctrl.deployRouterVersion(project, environment, data.routerVersion, eventsCh) + endpoint, err := ctrl.deployRouterVersion(ctx, project, environment, data.routerVersion, eventsCh) assert.NoError(t, err) assert.Equal(t, models.RouterVersionStatusDeployed, data.routerVersion.Status) assert.Equal(t, "test-url", endpoint) diff --git a/api/turing/api/ensembler_images_api_test.go b/api/turing/api/ensembler_images_api_test.go index 2428a70c5..ccab4f283 100644 --- a/api/turing/api/ensembler_images_api_test.go +++ b/api/turing/api/ensembler_images_api_test.go @@ -7,13 +7,15 @@ import ( "testing" "github.com/caraml-dev/mlp/api/client" + "github.com/stretchr/testify/mock" + "github.com/caraml-dev/turing/api/turing/api/request" "github.com/caraml-dev/turing/api/turing/imagebuilder" "github.com/caraml-dev/turing/api/turing/models" "github.com/caraml-dev/turing/api/turing/service" "github.com/caraml-dev/turing/api/turing/service/mocks" "github.com/caraml-dev/turing/api/turing/validation" - "github.com/stretchr/testify/mock" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" ) func TestEnsemblerImagesController_ListImages(t *testing.T) { @@ -477,6 +479,7 @@ func TestEnsemblerImagesController_ListImages(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) c := EnsemblerImagesController{ BaseController: NewBaseController( @@ -486,6 +489,7 @@ func TestEnsemblerImagesController_ListImages(t *testing.T) { EnsemblerImagesService: tt.ensemblerImageService(), }, validator, + mockWebhookClient, ), } if got := c.ListImages(tt.args.in0, tt.args.vars, tt.args.in2); !reflect.DeepEqual(got, tt.want) { @@ -818,6 +822,7 @@ func TestEnsemblerImagesController_BuildImage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) c := EnsemblerImagesController{ BaseController: NewBaseController( @@ -827,6 +832,7 @@ func TestEnsemblerImagesController_BuildImage(t *testing.T) { EnsemblerImagesService: tt.ensemblerImageService(), }, validator, + mockWebhookClient, ), } if got := c.BuildImage(tt.args.in0, tt.args.vars, tt.args.body); !reflect.DeepEqual(got, tt.want) { diff --git a/api/turing/api/ensemblers_api.go b/api/turing/api/ensemblers_api.go index d13fde43c..7c5af797f 100644 --- a/api/turing/api/ensemblers_api.go +++ b/api/turing/api/ensemblers_api.go @@ -9,8 +9,10 @@ import ( mlp "github.com/caraml-dev/mlp/api/client" "github.com/caraml-dev/turing/api/turing/api/request" + "github.com/caraml-dev/turing/api/turing/log" "github.com/caraml-dev/turing/api/turing/models" "github.com/caraml-dev/turing/api/turing/service" + "github.com/caraml-dev/turing/api/turing/webhook" ) type EnsemblersController struct { @@ -62,12 +64,16 @@ func (c EnsemblersController) GetEnsembler( } func (c EnsemblersController) CreateEnsembler( - _ *http.Request, + req *http.Request, vars RequestVars, body interface{}, ) *Response { - var errResp *Response - var project *mlp.Project + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -80,14 +86,23 @@ func (c EnsemblersController) CreateEnsembler( return InternalServerError("unable to save the ensembler", err.Error()) } + // call webhook for ensembler creation event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnEnsemblerCreated, ensembler); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, ensembler id: %d, %v", + webhook.OnEnsemblerCreated, ensembler.GetID(), errWebhook, + ) + } + return Created(ensembler) } func (c EnsemblersController) UpdateEnsembler( - _ *http.Request, + req *http.Request, vars RequestVars, body interface{}, ) *Response { + ctx := req.Context() options := EnsemblersPathOptions{} if err := c.ParseVars(&options, vars); err != nil { @@ -149,14 +164,23 @@ func (c EnsemblersController) UpdateEnsembler( } } + // call webhook for ensembler update event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnEnsemblerUpdated, ensembler); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, ensembler id: %d, %v", + webhook.OnEnsemblerUpdated, ensembler.GetID(), errWebhook, + ) + } + return Ok(ensembler) } func (c EnsemblersController) DeleteEnsembler( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { + ctx := req.Context() options := EnsemblersPathOptions{} if err := c.ParseVars(&options, vars); err != nil { @@ -213,6 +237,14 @@ func (c EnsemblersController) DeleteEnsembler( return Error(httpStatus, "failed to delete the ensembler", err.Error()) } + // call webhook for ensembler deletion event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnEnsemblerDeleted, ensembler); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, ensembler id: %d, %v", + webhook.OnEnsemblerDeleted, ensembler.GetID(), errWebhook, + ) + } + return Ok(map[string]int{"id": int(ensembler.GetID())}) } diff --git a/api/turing/api/ensemblers_api_test.go b/api/turing/api/ensemblers_api_test.go index f045a564f..0de5a5031 100644 --- a/api/turing/api/ensemblers_api_test.go +++ b/api/turing/api/ensemblers_api_test.go @@ -2,8 +2,11 @@ package api import ( "errors" + "net/http" "testing" + "github.com/caraml-dev/turing/api/turing/webhook" + "github.com/caraml-dev/mlp/api/pkg/client/mlflow" mlflowMock "github.com/caraml-dev/mlp/api/pkg/client/mlflow/mocks" "github.com/stretchr/testify/mock" @@ -16,6 +19,7 @@ import ( "github.com/caraml-dev/turing/api/turing/service" "github.com/caraml-dev/turing/api/turing/service/mocks" "github.com/caraml-dev/turing/api/turing/validation" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" ) func TestEnsemblersController_ListEnsemblers(t *testing.T) { @@ -130,11 +134,13 @@ func TestEnsemblersController_ListEnsemblers(t *testing.T) { ensemblersSvc = tt.ensemblerSvc() } validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) + ctrl := &EnsemblersController{ NewBaseController( &AppContext{ EnsemblersService: ensemblersSvc, - }, validator, + }, validator, mockWebhookClient, ), } response := ctrl.ListEnsemblers(nil, tt.vars, nil) @@ -221,6 +227,8 @@ func TestEnsemblersController_GetEnsembler(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) + var ensemblerSvc service.EnsemblersService if tt.ensemblerSvc != nil { ensemblerSvc = tt.ensemblerSvc() @@ -231,6 +239,7 @@ func TestEnsemblersController_GetEnsembler(t *testing.T) { EnsemblersService: ensemblerSvc, }, validator, + mockWebhookClient, ), } response := ctrl.GetEnsembler(nil, tt.vars, nil) @@ -267,13 +276,16 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { } tests := map[string]struct { + req *http.Request vars RequestVars ensemblerSvc func() service.EnsemblersService mlflowSvc func() mlflow.Service + webhookSvc func() webhook.Client body interface{} expected *Response }{ "success": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -308,9 +320,15 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { mlflowSvc.On("DeleteRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) return mlflowSvc }, + webhookSvc: func() webhook.Client { + webhookSvc := webhookMock.NewClient(t) + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnEnsemblerUpdated, mock.Anything).Return(nil) + return webhookSvc + }, expected: Ok(updated), }, "failure | bad request": { + req: &http.Request{}, vars: RequestVars{"project_id": {"unknown"}}, expected: BadRequest( "failed to fetch ensembler", @@ -318,6 +336,7 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { ), }, "failure | ensembler not found": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"1"}, "ensembler_id": {"2"}, @@ -337,6 +356,7 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { ), }, "failure | invalid payload": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -364,6 +384,7 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { ), }, "failure | incompatible types": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -391,6 +412,7 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { ), }, "failure | failed to save": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -433,6 +455,7 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + var ensemblerSvc service.EnsemblersService if tt.ensemblerSvc != nil { ensemblerSvc = tt.ensemblerSvc() @@ -441,6 +464,10 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { if tt.mlflowSvc != nil { mlflowSvc = tt.mlflowSvc() } + var webhookSvc webhook.Client + if tt.webhookSvc != nil { + webhookSvc = tt.webhookSvc() + } ctrl := &EnsemblersController{ NewBaseController( @@ -449,9 +476,10 @@ func TestEnsemblersController_UpdateEnsembler(t *testing.T) { MlflowService: mlflowSvc, }, validator, + webhookSvc, ), } - response := ctrl.UpdateEnsembler(nil, tt.vars, tt.body) + response := ctrl.UpdateEnsembler(tt.req, tt.vars, tt.body) assert.Equal(t, tt.expected, response) }) } @@ -512,14 +540,17 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { dummyEnsemblingJob := GenerateEnsemblingJobFixture(1, models.ID(1), models.ID(1), "", true) tests := map[string]struct { + req *http.Request vars RequestVars ensemblerSvc func() service.EnsemblersService mlflowSvc func() mlflow.Service routerVersionsSvc func() service.RouterVersionsService ensemblingJobSvc func() service.EnsemblingJobService + webhookSvc func() webhook.Client expected *Response }{ "failure | bad request": { + req: &http.Request{}, vars: RequestVars{"project_id": {"unknown"}}, expected: BadRequest( "failed to fetch ensembler", @@ -527,6 +558,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { ), }, "failure | ensembler not found": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"1"}, "ensembler_id": {"2"}, @@ -546,6 +578,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { ), }, "failure | there is active router version": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -587,6 +620,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { expected: BadRequest("failed to delete the ensembler", "there are active router version using this ensembler"), }, "failure | there is active ensembling job": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -631,6 +665,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { "there is ensembling job in terminating process, please wait until the job is successfully terminated"), }, "failure | there is current router version": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -684,6 +719,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { ), }, "failure | failed to delete router version": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -741,6 +777,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { expected: InternalServerError("unable to delete router version", "failed to delete router version"), }, "failure | failed to delete ensembling job": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -800,6 +837,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { expected: InternalServerError("unable to delete ensembling job", "failed to delete ensembling job"), }, "failure | failed to delete mlflow experiment": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -845,6 +883,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { expected: InternalServerError("failed to delete the ensembler", "failed to delete mlflow experiment"), }, "failure | failed to delete": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -890,6 +929,7 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { expected: InternalServerError("failed to delete the ensembler", "failed to delete"), }, "success | batch ensembling is not enabled": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -917,9 +957,15 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { mlflowSvc.On("DeleteExperiment", mock.Anything, "1", true).Return(nil) return mlflowSvc }, + webhookSvc: func() webhook.Client { + webhookSvc := webhookMock.NewClient(t) + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnEnsemblerDeleted, mock.Anything).Return(nil) + return webhookSvc + }, expected: Ok(map[string]int{"id": 2}), }, "success": { + req: &http.Request{}, vars: RequestVars{ "project_id": {"2"}, "ensembler_id": {"2"}, @@ -962,12 +1008,18 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { mlflowSvc.On("DeleteExperiment", mock.Anything, "1", true).Return(nil) return mlflowSvc }, + webhookSvc: func() webhook.Client { + webhookSvc := webhookMock.NewClient(t) + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnEnsemblerDeleted, mock.Anything).Return(nil) + return webhookSvc + }, expected: Ok(map[string]int{"id": 2}), }, } for name, tt := range tests { t.Run(name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + var ensemblerSvc service.EnsemblersService if tt.ensemblerSvc != nil { ensemblerSvc = tt.ensemblerSvc() @@ -984,6 +1036,10 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { if tt.routerVersionsSvc != nil { routerVersionsSvc = tt.routerVersionsSvc() } + var webhookClient webhook.Client + if tt.webhookSvc != nil { + webhookClient = tt.webhookSvc() + } ctrl := &EnsemblersController{ NewBaseController( @@ -994,9 +1050,10 @@ func TestEnsemblerController_DeleteEnsembler(t *testing.T) { RouterVersionsService: routerVersionsSvc, }, validator, + webhookClient, ), } - response := ctrl.DeleteEnsembler(nil, tt.vars, nil) + response := ctrl.DeleteEnsembler(tt.req, tt.vars, nil) assert.Equal(t, tt.expected, response) }) } diff --git a/api/turing/api/ensembling_job_api_test.go b/api/turing/api/ensembling_job_api_test.go index 15c84a75e..77022ff67 100644 --- a/api/turing/api/ensembling_job_api_test.go +++ b/api/turing/api/ensembling_job_api_test.go @@ -5,19 +5,20 @@ import ( "fmt" "testing" - "github.com/caraml-dev/turing/api/turing/batch" - merlin "github.com/caraml-dev/merlin/client" mlp "github.com/caraml-dev/mlp/api/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/caraml-dev/turing/api/turing/batch" openapi "github.com/caraml-dev/turing/api/turing/generated" "github.com/caraml-dev/turing/api/turing/internal/ref" "github.com/caraml-dev/turing/api/turing/models" "github.com/caraml-dev/turing/api/turing/service" "github.com/caraml-dev/turing/api/turing/service/mocks" "github.com/caraml-dev/turing/api/turing/validation" + "github.com/caraml-dev/turing/api/turing/webhook" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" ) var ( @@ -195,6 +196,7 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ensemblersService func() service.EnsemblersService ensemblingJobService func() service.EnsemblingJobService mlpService func() service.MLPService + webhookClient func() webhook.Client vars RequestVars body interface{} }{ @@ -233,6 +235,9 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ).Return(&mlp.Project{ID: 1}, nil) return mlpService }, + webhookClient: func() webhook.Client { + return webhookMock.NewClient(t) + }, vars: RequestVars{ "project_id": {"1"}, }, @@ -271,6 +276,9 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ).Return(&mlp.Project{ID: 1}, nil) return mlpService }, + webhookClient: func() webhook.Client { + return webhookMock.NewClient(t) + }, vars: RequestVars{ "project_id": {"1"}, }, @@ -303,6 +311,9 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ).Return(&mlp.Project{ID: 1}, nil) return mlpService }, + webhookClient: func() webhook.Client { + return webhookMock.NewClient(t) + }, vars: RequestVars{ "project_id": {"1"}, }, @@ -335,6 +346,9 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ).Return(&mlp.Project{ID: 1}, nil) return mlpService }, + webhookClient: func() webhook.Client { + return webhookMock.NewClient(t) + }, vars: RequestVars{ "project_id": {"1"}, }, @@ -367,6 +381,9 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ).Return(nil, errors.New("hello")) return mlpService }, + webhookClient: func() webhook.Client { + return webhookMock.NewClient(t) + }, vars: RequestVars{ "project_id": {"1"}, }, @@ -378,6 +395,7 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { ensemblersService := tt.ensemblersService() ensemblingJobService := tt.ensemblingJobService() mlpService := tt.mlpService() + webhookManager := tt.webhookClient() validator, _ := validation.NewValidator(nil) ctrl := &EnsemblingJobController{ @@ -388,6 +406,7 @@ func TestEnsemblingJobController_CreateEnsemblingJob(t *testing.T) { MLPService: mlpService, }, validator, + webhookManager, ), } response := ctrl.Create(nil, tt.vars, tt.body) @@ -477,12 +496,15 @@ func TestEnsemblingJobController_GetEnsemblingJob(t *testing.T) { t.Run(name, func(t *testing.T) { svc := tt.ensemblingJobService() validator, _ := validation.NewValidator(nil) + webhookClient := webhookMock.NewClient(t) + ctrl := &EnsemblingJobController{ NewBaseController( &AppContext{ EnsemblingJobService: svc, }, validator, + webhookClient, ), } resp := ctrl.GetEnsemblingJob(nil, tt.params, nil) @@ -722,12 +744,15 @@ func TestEnsemblingJobController_ListEnsemblingJob(t *testing.T) { t.Run(name, func(t *testing.T) { svc := tt.ensemblingJobService() validator, _ := validation.NewValidator(nil) + webhookClient := webhookMock.NewClient(t) + ctrl := &EnsemblingJobController{ NewBaseController( &AppContext{ EnsemblingJobService: svc, }, validator, + webhookClient, ), } resp := ctrl.ListEnsemblingJobs(nil, tt.params, nil) @@ -807,12 +832,15 @@ func TestEnsemblingJobController_DeleteEnsemblingJob(t *testing.T) { t.Run(name, func(t *testing.T) { svc := tt.ensemblingJobService() validator, _ := validation.NewValidator(nil) + webhookClient := webhookMock.NewClient(t) + ctrl := &EnsemblingJobController{ NewBaseController( &AppContext{ EnsemblingJobService: svc, }, validator, + webhookClient, ), } resp := ctrl.DeleteEnsemblingJob(nil, tt.params, nil) diff --git a/api/turing/api/pod_log_api_test.go b/api/turing/api/pod_log_api_test.go index 3e4852290..05c53454f 100644 --- a/api/turing/api/pod_log_api_test.go +++ b/api/turing/api/pod_log_api_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/caraml-dev/mlp/api/client" "github.com/stretchr/testify/mock" "github.com/caraml-dev/turing/api/turing/batch" @@ -16,13 +17,11 @@ import ( "github.com/caraml-dev/turing/api/turing/cluster/servicebuilder" openapi "github.com/caraml-dev/turing/api/turing/generated" "github.com/caraml-dev/turing/api/turing/internal/ref" - "github.com/caraml-dev/turing/api/turing/service" - "github.com/caraml-dev/turing/api/turing/validation" - - "github.com/caraml-dev/mlp/api/client" - "github.com/caraml-dev/turing/api/turing/models" + "github.com/caraml-dev/turing/api/turing/service" "github.com/caraml-dev/turing/api/turing/service/mocks" + "github.com/caraml-dev/turing/api/turing/validation" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" ) func TestPodLogControllerListEnsemblingPodLogs(t *testing.T) { @@ -298,6 +297,8 @@ func TestPodLogControllerListEnsemblingPodLogs(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) + c := PodLogController{ NewBaseController( &AppContext{ @@ -306,6 +307,7 @@ func TestPodLogControllerListEnsemblingPodLogs(t *testing.T) { EnsemblingJobService: tt.ensemblingJobService(), }, validator, + mockWebhookClient, ), } if got := c.ListEnsemblingJobPodLogs(nil, tt.vars, nil); !reflect.DeepEqual(got, tt.expected) { @@ -650,6 +652,8 @@ func TestPodLogControllerListRouterPodLogs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validator, _ := validation.NewValidator(nil) + mockWebhookClient := webhookMock.NewClient(t) + c := PodLogController{ NewBaseController( &AppContext{ @@ -659,6 +663,7 @@ func TestPodLogControllerListRouterPodLogs(t *testing.T) { RouterVersionsService: tt.routerVersionsService(), }, validator, + mockWebhookClient, ), } if got := c.ListRouterPodLogs(tt.args.r, tt.args.vars, tt.args.body); !reflect.DeepEqual(got, tt.want) { diff --git a/api/turing/api/router_versions_api.go b/api/turing/api/router_versions_api.go index 8d4c37447..8e1b48932 100644 --- a/api/turing/api/router_versions_api.go +++ b/api/turing/api/router_versions_api.go @@ -4,13 +4,13 @@ import ( "fmt" "net/http" - "github.com/caraml-dev/turing/api/turing/service" - mlp "github.com/caraml-dev/mlp/api/client" "github.com/caraml-dev/turing/api/turing/api/request" "github.com/caraml-dev/turing/api/turing/log" "github.com/caraml-dev/turing/api/turing/models" + "github.com/caraml-dev/turing/api/turing/service" + "github.com/caraml-dev/turing/api/turing/webhook" ) type RouterVersionsController struct { @@ -41,11 +41,15 @@ func (c RouterVersionsController) ListRouterVersions( // CreateRouterVersion creates a router version from the provided configuration. If no router exists // within the provided project with the provided id, this method will throw an error. // If the update is valid, a new RouterVersion will be created but NOT deployed. -func (c RouterVersionsController) CreateRouterVersion(_ *http.Request, vars RequestVars, body interface{}) *Response { +func (c RouterVersionsController) CreateRouterVersion(req *http.Request, vars RequestVars, body interface{}) *Response { // Parse request vars - var errResp *Response - var router *models.Router - var project *mlp.Project + var ( + ctx = req.Context() + errResp *Response + router *models.Router + project *mlp.Project + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -79,6 +83,16 @@ func (c RouterVersionsController) CreateRouterVersion(_ *http.Request, vars Requ return InternalServerError("unable to create router version", err.Error()) } + // call webhook for router version creation event + if errWebhook := c.webhookClient.TriggerWebhooks( + ctx, webhook.OnRouterVersionCreated, routerVersion, + ); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, router version id: %d, %v", + webhook.OnRouterVersionCreated, router.ID, routerVersion.ID, errWebhook, + ) + } + return Ok(routerVersion) } @@ -100,14 +114,18 @@ func (c RouterVersionsController) GetRouterVersion( // DeleteRouterVersion deletes the config for the given version number. func (c RouterVersionsController) DeleteRouterVersion( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { // Parse request vars - var errResp *Response - var router *models.Router - var routerVersion *models.RouterVersion + var ( + ctx = req.Context() + errResp *Response + router *models.Router + routerVersion *models.RouterVersion + ) + if router, errResp = c.getRouterFromRequestVars(vars); errResp != nil { return errResp } @@ -130,20 +148,34 @@ func (c RouterVersionsController) DeleteRouterVersion( if err != nil { return InternalServerError("unable to delete router version", err.Error()) } + + // call webhook for router version deletion event + if errWebhook := c.webhookClient.TriggerWebhooks( + ctx, webhook.OnRouterVersionDeleted, routerVersion, + ); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, router version id: %d, %v", + webhook.OnRouterVersionDeleted, router.ID, routerVersion.ID, errWebhook, + ) + } return Ok(map[string]int{"router_id": int(router.ID), "version": int(routerVersion.Version)}) } // DeployRouterVersion deploys the given router version into the associated kubernetes cluster func (c RouterVersionsController) DeployRouterVersion( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { // Parse request vars - var errResp *Response - var project *mlp.Project - var router *models.Router - var routerVersion *models.RouterVersion + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + router *models.Router + routerVersion *models.RouterVersion + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -173,6 +205,16 @@ func (c RouterVersionsController) DeployRouterVersion( log.Errorf("Error deploying router version %s:%s:%d: %v", project.Name, router.Name, routerVersion.Version, err) } + + // call webhook for router version deployment event + if errWebhook := c.webhookClient.TriggerWebhooks( + ctx, webhook.OnRouterVersionDeployed, routerVersion, + ); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, router version id: %d, %v", + webhook.OnRouterVersionDeployed, router.ID, routerVersion.ID, errWebhook, + ) + } }() return Accepted(map[string]int{ diff --git a/api/turing/api/router_versions_api_test.go b/api/turing/api/router_versions_api_test.go index 50cb0ee3b..6222afa04 100644 --- a/api/turing/api/router_versions_api_test.go +++ b/api/turing/api/router_versions_api_test.go @@ -2,17 +2,21 @@ package api import ( "errors" + "net/http" "testing" - merlin "github.com/caraml-dev/merlin/client" - mlp "github.com/caraml-dev/mlp/api/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + merlin "github.com/caraml-dev/merlin/client" + mlp "github.com/caraml-dev/mlp/api/client" + "github.com/caraml-dev/turing/api/turing/api/request" "github.com/caraml-dev/turing/api/turing/config" "github.com/caraml-dev/turing/api/turing/models" "github.com/caraml-dev/turing/api/turing/service/mocks" + "github.com/caraml-dev/turing/api/turing/webhook" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" routerConfig "github.com/caraml-dev/turing/engines/router/missionctl/config" ) @@ -127,6 +131,10 @@ func TestCreateRouterVersion(t *testing.T) { routerVersionSvc := &mocks.RouterVersionsService{} routerVersionSvc.On("Save", routerVersion).Return(routerVersion, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterVersionCreated, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { vars RequestVars @@ -187,11 +195,12 @@ func TestCreateRouterVersion(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.CreateRouterVersion(nil, data.vars, data.body) + response := ctrl.CreateRouterVersion(&http.Request{}, data.vars, data.body) assert.Equal(t, data.expected, response) }) } @@ -331,6 +340,10 @@ func TestDeleteRouterVersion(t *testing.T) { On("FindByID", models.ID(2)). Return(router2, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterVersionDeleted, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { vars RequestVars @@ -404,11 +417,12 @@ func TestDeleteRouterVersion(t *testing.T) { RoutersService: routerSvc, RouterVersionsService: routerVersionSvc, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.DeleteRouterVersion(nil, data.vars, nil) + response := ctrl.DeleteRouterVersion(&http.Request{}, data.vars, nil) assert.Equal(t, data.expected, response) }) } @@ -522,6 +536,10 @@ func TestDeployRouterVersion(t *testing.T) { On("FindByRouterIDAndVersion", models.ID(4), uint(4)). Return(routerVersion4, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterVersionDeployed, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { vars RequestVars @@ -586,11 +604,12 @@ func TestDeployRouterVersion(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.DeployRouterVersion(nil, data.vars, nil) + response := ctrl.DeployRouterVersion(&http.Request{}, data.vars, nil) assert.Equal(t, data.expected, response) }) } diff --git a/api/turing/api/routers_api.go b/api/turing/api/routers_api.go index e9306059f..8fb371f5f 100644 --- a/api/turing/api/routers_api.go +++ b/api/turing/api/routers_api.go @@ -8,9 +8,9 @@ import ( mlp "github.com/caraml-dev/mlp/api/client" "github.com/caraml-dev/turing/api/turing/api/request" - "github.com/caraml-dev/turing/api/turing/models" - "github.com/caraml-dev/turing/api/turing/log" + "github.com/caraml-dev/turing/api/turing/models" + "github.com/caraml-dev/turing/api/turing/webhook" ) type RoutersController struct { @@ -59,13 +59,17 @@ func (c RoutersController) GetRouter( // a router within the provided project with the same name, this method will throw an error. // If not, a new Router and associated RouterVersion will be created and deployed. func (c RoutersController) CreateRouter( - _ *http.Request, + req *http.Request, vars RequestVars, body interface{}, ) *Response { // Parse request vars - var errResp *Response - var project *mlp.Project + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -126,6 +130,12 @@ func (c RoutersController) CreateRouter( log.Errorf("Error deploying router %s:%s:%d: %v", project.Name, router.Name, routerVersion.Version, err) } + + // call webhook for router creation event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnRouterCreated, router); errWebhook != nil { + log.Warnf("Error triggering webhook for event %s, router id: %d, %v", + webhook.OnRouterCreated, router.ID, errWebhook) + } }() return Ok(router) @@ -134,11 +144,15 @@ func (c RoutersController) CreateRouter( // UpdateRouter updates a router from the provided configuration. If no router exists // within the provided project with the provided id, this method will throw an error. // If the update is valid, a new RouterVersion will be created and deployed. -func (c RoutersController) UpdateRouter(_ *http.Request, vars RequestVars, body interface{}) *Response { +func (c RoutersController) UpdateRouter(req *http.Request, vars RequestVars, body interface{}) *Response { // Parse request vars - var errResp *Response - var project *mlp.Project - var router *models.Router + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + router *models.Router + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -189,6 +203,12 @@ func (c RoutersController) UpdateRouter(_ *http.Request, vars RequestVars, body log.Errorf("Error deploying router %s:%s:%d: %v", project.Name, router.Name, routerVersion.Version, err) } + + // call webhook for router update event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnRouterUpdated, router); errWebhook != nil { + log.Warnf("Error triggering webhook for event %s, router id: %d, %v", + webhook.OnRouterUpdated, router.ID, errWebhook) + } }() return Ok(router) @@ -196,13 +216,17 @@ func (c RoutersController) UpdateRouter(_ *http.Request, vars RequestVars, body // DeleteRouter deletes a router and all its associated versions. func (c RoutersController) DeleteRouter( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { // Parse request vars - var errResp *Response - var router *models.Router + var ( + ctx = req.Context() + errResp *Response + router *models.Router + ) + if router, errResp = c.getRouterFromRequestVars(vars); errResp != nil { return errResp } @@ -225,20 +249,33 @@ func (c RoutersController) DeleteRouter( if err != nil { return InternalServerError("unable to delete router", err.Error()) } + + // call webhook for router deletion event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnRouterDeleted, router); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, %v", + webhook.OnRouterDeleted, router.ID, errWebhook, + ) + } + return Ok(map[string]int{"id": int(router.ID)}) } // DeployRouter deploys the current version of the given router into the associated // kubernetes cluster. If there is no current version, an error is returned. func (c RoutersController) DeployRouter( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { // Parse request vars - var errResp *Response - var project *mlp.Project - var router *models.Router + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + router *models.Router + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -274,6 +311,14 @@ func (c RoutersController) DeployRouter( log.Errorf("Error deploying router version %s:%s:%d: %v", project.Name, router.Name, routerVersion.Version, err) } + + // call webhook for router deployment event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnRouterDeployed, router); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, %v", + webhook.OnRouterDeployed, router.ID, errWebhook, + ) + } }() return Accepted(map[string]int{ @@ -284,14 +329,18 @@ func (c RoutersController) DeployRouter( // UndeployRouter deletes the given router specs from the associated kubernetes cluster func (c RoutersController) UndeployRouter( - _ *http.Request, + req *http.Request, vars RequestVars, _ interface{}, ) *Response { // Parse request vars - var errResp *Response - var project *mlp.Project - var router *models.Router + var ( + ctx = req.Context() + errResp *Response + project *mlp.Project + router *models.Router + ) + if project, errResp = c.getProjectFromRequestVars(vars); errResp != nil { return errResp } @@ -305,6 +354,14 @@ func (c RoutersController) UndeployRouter( return InternalServerError("unable to undeploy router", err.Error()) } + // call webhook for router un-deployment event + if errWebhook := c.webhookClient.TriggerWebhooks(ctx, webhook.OnRouterUndeployed, router); errWebhook != nil { + log.Warnf( + "Error triggering webhook for event %s, router id: %d, %v", + webhook.OnRouterUndeployed, router.ID, errWebhook, + ) + } + return Ok(map[string]int{"router_id": int(router.ID)}) } diff --git a/api/turing/api/routers_api_test.go b/api/turing/api/routers_api_test.go index ee7e4b68e..20a27201b 100644 --- a/api/turing/api/routers_api_test.go +++ b/api/turing/api/routers_api_test.go @@ -2,17 +2,21 @@ package api import ( "errors" + "net/http" "testing" - merlin "github.com/caraml-dev/merlin/client" - mlp "github.com/caraml-dev/mlp/api/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + merlin "github.com/caraml-dev/merlin/client" + mlp "github.com/caraml-dev/mlp/api/client" + "github.com/caraml-dev/turing/api/turing/api/request" "github.com/caraml-dev/turing/api/turing/config" "github.com/caraml-dev/turing/api/turing/models" "github.com/caraml-dev/turing/api/turing/service/mocks" + "github.com/caraml-dev/turing/api/turing/webhook" + webhookMock "github.com/caraml-dev/turing/api/turing/webhook/mocks" routerConfig "github.com/caraml-dev/turing/engines/router/missionctl/config" ) @@ -235,21 +239,29 @@ func TestCreateRouter(t *testing.T) { routerVersionSvc := &mocks.RouterVersionsService{} routerVersionSvc.On("Save", routerVersion).Return(routerVersion, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterCreated, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { + req *http.Request body interface{} vars RequestVars expected *Response }{ "failure | bad request": { + req: &http.Request{}, vars: RequestVars{}, expected: BadRequest("invalid project id", "key project_id not found in vars"), }, "failure | project not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"1"}}, expected: NotFound("project not found", "test project error"), }, "failure | router exists": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router1", }, @@ -257,6 +269,7 @@ func TestCreateRouter(t *testing.T) { expected: BadRequest("invalid router name", "router with name router1 already exists in project 2"), }, "failure | environment missing": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router2", Environment: "dev-invalid", @@ -265,6 +278,7 @@ func TestCreateRouter(t *testing.T) { expected: BadRequest("invalid environment", "environment dev-invalid does not exist"), }, "failure | router save": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router2", Environment: "dev", @@ -273,6 +287,7 @@ func TestCreateRouter(t *testing.T) { expected: InternalServerError("unable to create router", "test router save error"), }, "failure | build router version": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router3", Environment: "dev", @@ -281,6 +296,7 @@ func TestCreateRouter(t *testing.T) { expected: InternalServerError("unable to create router", "router config is empty"), }, "success": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router3", Environment: "dev", @@ -313,11 +329,12 @@ func TestCreateRouter(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.CreateRouter(nil, data.vars, data.body) + response := ctrl.CreateRouter(data.req, data.vars, data.body) assert.Equal(t, data.expected, response) }) } @@ -385,25 +402,34 @@ func TestUpdateRouter(t *testing.T) { routerVersionSvc := &mocks.RouterVersionsService{} routerVersionSvc.On("Save", routerVersion).Return(routerVersion, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterUpdated, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { + req *http.Request body interface{} vars RequestVars expected *Response }{ "failure | bad request (missing project_id)": { + req: &http.Request{}, vars: RequestVars{}, expected: BadRequest("invalid project id", "key project_id not found in vars"), }, "failure | project not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"1"}, "router_id": {"1"}}, expected: NotFound("project not found", "test project error"), }, "failure | bad request (missing router_id)": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}}, expected: BadRequest("invalid router id", "key router_id not found in vars"), }, "failure | router not found": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router1", }, @@ -411,6 +437,7 @@ func TestUpdateRouter(t *testing.T) { expected: NotFound("router not found", "test router error"), }, "failure | invalid router config": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router1", }, @@ -421,6 +448,7 @@ func TestUpdateRouter(t *testing.T) { ), }, "failure | deployment in progress": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router2", Environment: "dev", @@ -432,6 +460,7 @@ func TestUpdateRouter(t *testing.T) { ), }, "failure | build router version": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router3", Environment: "dev", @@ -440,6 +469,7 @@ func TestUpdateRouter(t *testing.T) { expected: InternalServerError("unable to update router", "router config is empty"), }, "success": { + req: &http.Request{}, body: &request.CreateOrUpdateRouterRequest{ Name: "router4", Environment: "dev", @@ -472,11 +502,12 @@ func TestUpdateRouter(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.UpdateRouter(nil, data.vars, data.body) + response := ctrl.UpdateRouter(data.req, data.vars, data.body) assert.Equal(t, data.expected, response) }) } @@ -560,20 +591,28 @@ func TestDeleteRouter(t *testing.T) { On("ListRouterVersionsWithStatus", models.ID(6), models.RouterVersionStatusPending). Return([]*models.RouterVersion{}, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterDeleted, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { + req *http.Request vars RequestVars expected *Response }{ "failure | bad request (missing router_id)": { + req: &http.Request{}, vars: RequestVars{}, expected: BadRequest("invalid router id", "key router_id not found in vars"), }, "failure | router not found": { + req: &http.Request{}, vars: RequestVars{"router_id": {"1"}}, expected: NotFound("router not found", "test router error"), }, "failure | router deployed": { + req: &http.Request{}, vars: RequestVars{"router_id": {"2"}}, expected: BadRequest( "invalid delete request", @@ -581,6 +620,7 @@ func TestDeleteRouter(t *testing.T) { ), }, "failure | list router versions": { + req: &http.Request{}, vars: RequestVars{"router_id": {"3"}}, expected: InternalServerError( "unable to retrieve router versions", @@ -588,6 +628,7 @@ func TestDeleteRouter(t *testing.T) { ), }, "failure | pending router versions": { + req: &http.Request{}, vars: RequestVars{"router_id": {"4"}}, expected: BadRequest( "invalid delete request", @@ -595,10 +636,12 @@ func TestDeleteRouter(t *testing.T) { ), }, "failure | delete failed": { + req: &http.Request{}, vars: RequestVars{"router_id": {"5"}}, expected: InternalServerError("unable to delete router", "test delete router error"), }, "success": { + req: &http.Request{}, vars: RequestVars{"router_id": {"6"}}, expected: &Response{ code: 200, @@ -618,11 +661,12 @@ func TestDeleteRouter(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.DeleteRouter(nil, data.vars, nil) + response := ctrl.DeleteRouter(data.req, data.vars, nil) assert.Equal(t, data.expected, response) }) } @@ -717,28 +761,38 @@ func TestDeployRouter(t *testing.T) { Return(nil, errors.New("test router version error")) routerVersionSvc.On("FindByID", models.ID(2)).Return(routerVersion, nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterDeployed, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { + req *http.Request vars RequestVars expected *Response }{ "failure | bad request (missing project_id)": { + req: &http.Request{}, vars: RequestVars{}, expected: BadRequest("invalid project id", "key project_id not found in vars"), }, "failure | project not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"1"}, "router_id": {"1"}}, expected: NotFound("project not found", "test project error"), }, "failure | bad request (missing router_id)": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}}, expected: BadRequest("invalid router id", "key router_id not found in vars"), }, "failure | router not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"1"}}, expected: NotFound("router not found", "test router error"), }, "failure | router status pending": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"2"}}, expected: BadRequest( "invalid deploy request", @@ -746,18 +800,22 @@ func TestDeployRouter(t *testing.T) { ), }, "failure | router status deployed": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"3"}}, expected: BadRequest("invalid deploy request", "router is already deployed"), }, "failure | no current version": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"4"}}, expected: BadRequest("invalid deploy request", "router has no current configuration"), }, "failure | router version not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"5"}}, expected: NotFound("router version not found", "test router version error"), }, "success": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"6"}}, expected: &Response{ code: 202, @@ -781,11 +839,12 @@ func TestDeployRouter(t *testing.T) { RouterVersionsService: routerVersionSvc, RouterDefaults: &config.RouterDefaults{}, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.DeployRouter(nil, data.vars, nil) + response := ctrl.DeployRouter(data.req, data.vars, nil) assert.Equal(t, data.expected, response) }) } @@ -854,28 +913,38 @@ func TestUndeployRouter(t *testing.T) { On("DeleteRouterEndpoint", project, environment, &models.RouterVersion{Router: router3}). Return(nil) + // Webhook service + webhookSvc := &webhookMock.Client{} + webhookSvc.On("TriggerWebhooks", mock.Anything, webhook.OnRouterUndeployed, mock.Anything).Return(nil) + // Define tests tests := map[string]struct { + req *http.Request vars RequestVars expected *Response }{ "failure | bad request (missing project_id)": { + req: &http.Request{}, vars: RequestVars{}, expected: BadRequest("invalid project id", "key project_id not found in vars"), }, "failure | project not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"1"}, "router_id": {"1"}}, expected: NotFound("project not found", "test project error"), }, "failure | bad request (missing router_id)": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}}, expected: BadRequest("invalid router id", "key router_id not found in vars"), }, "failure | router not found": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"1"}}, expected: NotFound("router not found", "test router error"), }, "failure | undeploy error": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"2"}}, expected: InternalServerError( "unable to undeploy router", @@ -883,6 +952,7 @@ func TestUndeployRouter(t *testing.T) { ), }, "success": { + req: &http.Request{}, vars: RequestVars{"project_id": {"2"}, "router_id": {"3"}}, expected: &Response{ code: 200, @@ -905,11 +975,12 @@ func TestUndeployRouter(t *testing.T) { EventService: eventSvc, DeploymentService: deploymentSvc, }, + webhookClient: webhookSvc, }, }, } // Run test method and validate - response := ctrl.UndeployRouter(nil, data.vars, nil) + response := ctrl.UndeployRouter(data.req, data.vars, nil) assert.Equal(t, data.expected, response) }) } diff --git a/api/turing/config/config.go b/api/turing/config/config.go index 4d98f3872..8dc3596ff 100644 --- a/api/turing/config/config.go +++ b/api/turing/config/config.go @@ -12,6 +12,7 @@ import ( mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" + "github.com/caraml-dev/mlp/api/pkg/webhooks" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" "gopkg.in/yaml.v2" @@ -89,6 +90,8 @@ type Config struct { TuringUIConfig *SinglePageApplicationConfig OpenapiConfig *OpenapiConfig MlflowConfig *MlflowConfig + WebhooksConfig webhooks.Config + // Experiment specifies the JSON configuration to set up experiment managers and runners. // // The configuration follows the following format to support different experiment engines diff --git a/api/turing/server/api.go b/api/turing/server/api.go index e8ca1b865..fac1ef398 100644 --- a/api/turing/server/api.go +++ b/api/turing/server/api.go @@ -13,6 +13,7 @@ import ( "github.com/caraml-dev/turing/api/turing/config" "github.com/caraml-dev/turing/api/turing/middleware" "github.com/caraml-dev/turing/api/turing/validation" + "github.com/caraml-dev/turing/api/turing/webhook" ) func AddAPIRoutesHandler(r *mux.Router, path string, appCtx *api.AppContext, cfg *config.Config) error { @@ -31,8 +32,14 @@ func AddAPIRoutesHandler(r *mux.Router, path string, appCtx *api.AppContext, cfg apiRouter.Use(openapiMiddleware, sentry.Recoverer) + // Initialize webhook client + webhookClient, err := webhook.NewWebhook(&cfg.WebhooksConfig) + if err != nil { + return err + } + validator, _ := validation.NewValidator(appCtx.ExperimentsService) - baseController := api.NewBaseController(appCtx, validator) + baseController := api.NewBaseController(appCtx, validator, webhookClient) deploymentController := api.RouterDeploymentController{BaseController: baseController} controllers := []api.Controller{ api.AlertsController{BaseController: baseController}, diff --git a/api/turing/webhook/mocks/webhook.go b/api/turing/webhook/mocks/webhook.go new file mode 100644 index 000000000..31d3f6977 --- /dev/null +++ b/api/turing/webhook/mocks/webhook.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.45.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + webhooks "github.com/caraml-dev/mlp/api/pkg/webhooks" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// TriggerWebhooks provides a mock function with given fields: ctx, eventType, body +func (_m *Client) TriggerWebhooks(ctx context.Context, eventType webhooks.EventType, body interface{}) error { + ret := _m.Called(ctx, eventType, body) + + if len(ret) == 0 { + panic("no return value specified for TriggerWebhooks") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, webhooks.EventType, interface{}) error); ok { + r0 = rf(ctx, eventType, body) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClient(t interface { + mock.TestingT + Cleanup(func()) +}) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/api/turing/webhook/request.go b/api/turing/webhook/request.go new file mode 100644 index 000000000..7de0af858 --- /dev/null +++ b/api/turing/webhook/request.go @@ -0,0 +1,10 @@ +package webhook + +import ( + "github.com/caraml-dev/mlp/api/pkg/webhooks" +) + +type Request struct { + EventType webhooks.EventType `json:"event_type"` + Data interface{} `json:"data"` +} diff --git a/api/turing/webhook/webhook.go b/api/turing/webhook/webhook.go new file mode 100644 index 000000000..36e4c3f75 --- /dev/null +++ b/api/turing/webhook/webhook.go @@ -0,0 +1,82 @@ +package webhook + +import ( + "context" + + "github.com/caraml-dev/mlp/api/pkg/webhooks" +) + +var ( + OnRouterCreated = webhooks.EventType("on-router-created") + OnRouterUpdated = webhooks.EventType("on-router-updated") + OnRouterDeleted = webhooks.EventType("on-router-deleted") + OnRouterDeployed = webhooks.EventType("on-router-deployed") + OnRouterUndeployed = webhooks.EventType("on-router-undeployed") + + OnRouterVersionCreated = webhooks.EventType("on-router-version-created") + OnRouterVersionDeleted = webhooks.EventType("on-router-version-deleted") + OnRouterVersionDeployed = webhooks.EventType("on-router-version-deployed") + + OnEnsemblerCreated = webhooks.EventType("on-ensembler-created") + OnEnsemblerUpdated = webhooks.EventType("on-ensembler-updated") + OnEnsemblerDeleted = webhooks.EventType("on-ensembler-deleted") +) + +var events = []webhooks.EventType{ + OnRouterCreated, + OnRouterUpdated, + OnRouterDeleted, + OnRouterDeployed, + OnRouterVersionCreated, + OnRouterVersionDeleted, + OnRouterVersionDeployed, + OnRouterUndeployed, + OnEnsemblerCreated, + OnEnsemblerUpdated, + OnEnsemblerDeleted, +} + +type webhook struct { + webhookManager webhooks.WebhookManager +} + +type Client interface { + TriggerWebhooks(ctx context.Context, eventType webhooks.EventType, body interface{}) error +} + +func NewWebhook(cfg *webhooks.Config) (Client, error) { + webhookManager, err := webhooks.InitializeWebhooks(cfg, events) + if err != nil { + return nil, err + } + + return webhook{ + webhookManager: webhookManager, + }, nil +} + +func (w webhook) TriggerWebhooks(ctx context.Context, eventType webhooks.EventType, body interface{}) error { + if !w.isEventConfigured(eventType) { + return nil + } + + // Adds the eventType to the body of the webhook request so that a single webhook endpoint is able to respond + // differently to different event types, especially if the same webhook endpoint is configured for multiple events, + // This is because the event type does not normally get sent to the webhook endpoint. + newBody := &Request{ + EventType: eventType, + Data: body, + } + + return w.webhookManager.InvokeWebhooks( + ctx, + eventType, + newBody, + webhooks.NoOpCallback, + webhooks.NoOpErrorHandler, + ) +} + +func (w webhook) isEventConfigured(eventType webhooks.EventType) bool { + return w.webhookManager != nil && w.webhookManager.IsEventConfigured(eventType) +} diff --git a/api/turing/webhook/webhook_test.go b/api/turing/webhook/webhook_test.go new file mode 100644 index 000000000..9ceedef6e --- /dev/null +++ b/api/turing/webhook/webhook_test.go @@ -0,0 +1,171 @@ +package webhook + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/caraml-dev/mlp/api/pkg/webhooks" + "github.com/stretchr/testify/mock" + + "github.com/caraml-dev/turing/api/turing/models" +) + +type routerRequest struct { + EventType webhooks.EventType `json:"event_type"` + Router *models.Router `json:"router"` +} + +func TestNewWebhook(t *testing.T) { + type args struct { + cfg *webhooks.Config + } + tests := []struct { + name string + args args + want Client + wantErr bool + }{ + { + name: "positive", + args: args{ + cfg: &webhooks.Config{}, + }, + want: webhook{}, + }, + { + name: "negative - num retries is negative", + args: args{ + cfg: &webhooks.Config{ + Enabled: true, + Config: map[webhooks.EventType][]webhooks.WebhookConfig{ + OnRouterCreated: {{NumRetries: -1}}, + }, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewWebhook(tt.args.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("NewWebhook() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewWebhook() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_webhook_triggerEvent(t *testing.T) { + mockWebhookManager := webhooks.NewMockWebhookManager(t) + + type fields struct { + manager webhooks.WebhookManager + } + type args struct { + ctx context.Context + eventType webhooks.EventType + body interface{} + } + tests := []struct { + name string + fields fields + args args + wantErr bool + mockFunc func(args) + }{ + { + name: "positive - event not configured", + fields: fields{ + manager: mockWebhookManager, + }, + args: args{ + ctx: context.TODO(), + eventType: OnRouterCreated, + body: routerRequest{ + EventType: OnRouterCreated, + Router: &models.Router{}, + }, + }, + mockFunc: func(args args) { + mockWebhookManager.On("IsEventConfigured", args.eventType).Once(). + Return(false) + }, + }, + { + name: "positive - invoke webhook", + fields: fields{ + manager: mockWebhookManager, + }, + args: args{ + ctx: context.TODO(), + eventType: OnRouterCreated, + body: routerRequest{ + EventType: OnRouterCreated, + Router: &models.Router{}, + }, + }, + mockFunc: func(args args) { + mockWebhookManager.On("IsEventConfigured", args.eventType). + Once().Return(true) + mockWebhookManager.On( + "InvokeWebhooks", + args.ctx, + args.eventType, + &Request{ + EventType: args.eventType, + Data: args.body, + }, + mock.Anything, + mock.Anything, + ).Once().Return(nil) + }, + }, + { + name: "negative - invoke webhook", + fields: fields{ + manager: mockWebhookManager, + }, + args: args{ + ctx: context.TODO(), + eventType: OnRouterCreated, + body: routerRequest{ + EventType: OnRouterCreated, + Router: &models.Router{}, + }, + }, + mockFunc: func(args args) { + mockWebhookManager.On("IsEventConfigured", args.eventType). + Once().Return(true) + mockWebhookManager.On( + "InvokeWebhooks", + args.ctx, + args.eventType, + &Request{ + EventType: args.eventType, + Data: args.body, + }, + mock.Anything, + mock.Anything, + ).Once().Return(errors.New("mock error")) + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := webhook{ + webhookManager: tt.fields.manager, + } + tt.mockFunc(tt.args) + if err := w.TriggerWebhooks(tt.args.ctx, tt.args.eventType, tt.args.body); (err != nil) != tt.wantErr { + t.Errorf("triggerEvent() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}