Skip to content

Commit

Permalink
cdc/server/integration_test.go: added in a new test for the CDC serve…
Browse files Browse the repository at this point in the history
…r using the workload.Checker

Integrated the logic from the existing CDC server integration test and the
latest developments for the pglogical integration tests to create a test based
on the workload.Checker that validates the CDC server. This implementation also
aims to refactor the Changefeed creation string logic so that it's reusable for
other types of tests or the workload command.

Resolves: #906
Release Note: None
  • Loading branch information
ryanluu12345 committed Oct 11, 2024
1 parent 2bce2c1 commit 202c581
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 5 deletions.
350 changes: 350 additions & 0 deletions internal/source/cdc/server/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package server

import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"io"
"net/http"
Expand All @@ -31,14 +33,18 @@ import (
"github.com/cockroachdb/replicator/internal/sequencer"
stagingProd "github.com/cockroachdb/replicator/internal/sinkprod"
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/sinktest/all"
"github.com/cockroachdb/replicator/internal/sinktest/base"
"github.com/cockroachdb/replicator/internal/source/cdc"
"github.com/cockroachdb/replicator/internal/types"
jwtAuth "github.com/cockroachdb/replicator/internal/util/auth/jwt"
"github.com/cockroachdb/replicator/internal/util/diag"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/cockroachdb/replicator/internal/util/stdlogical"
"github.com/cockroachdb/replicator/internal/util/stdpool"
"github.com/cockroachdb/replicator/internal/util/stdserver"
"github.com/cockroachdb/replicator/internal/util/workload"
joonix "github.com/joonix/log"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -395,3 +401,347 @@ func supportsQueries(version string) (bool, error) {
func supportsWebhook(version string) (bool, error) {
return stdpool.CockroachMinVersion(version, "v21.2")
}

func getConfig(
cfg *testConfig, fixture *all.Fixture, targetPool *types.TargetPool,
) (*Config, error) {
return &Config{
CDC: cdc.Config{
ConveyorConfig: conveyor.Config{
Immediate: cfg.immediate,
},
SequencerConfig: sequencer.Config{
RetireOffset: time.Hour, // Allow post-hoc inspection of staged data.
Parallelism: 1,
},
NDJsonBuffer: 1,
},
HTTP: stdserver.Config{
BindAddr: "127.0.0.1:0",
GenerateSelfSigned: cfg.webhook, // Webhook implies self-signed TLS is ok.
},
Staging: stagingProd.StagingConfig{
CommonConfig: stagingProd.CommonConfig{
Conn: fixture.StagingPool.ConnectionString,
MaxPoolSize: 16,
},
Schema: fixture.StagingDB.Schema(),
},
Target: stagingProd.TargetConfig{
CommonConfig: stagingProd.CommonConfig{
Conn: targetPool.ConnectionString,
MaxPoolSize: 16,
},
},
}, nil
}

// ChangefeedConfig is meant to influence the behavior
// of the created changefeed statement depending
// on the options defined. This can be extended later on
// to handle various configurations and webhook parameters.
type ChangefeedConfig struct {
diff bool
queries bool
webhook bool
}

// CreateChangefeed creates a changefeed on the source CRDB side that is
// compatible with the replicator webhook. It is an intentional decision to make
// the input parameters as specific as possible so that the caller doesn't need
// to construct whole structs with unrelated information to create the
// changefeed. This makes it so this is more portable for future work that needs
// this: for example, integrating the e2e workload checker with this logic.
func CreateChangefeedStatement(
cfg *ChangefeedConfig,
host string,
target ident.Table,
token string,
tables []ident.Table,
sourceVersion string,
) string {
params := make(url.Values)
var feedURL url.URL
var pathIdent ident.Identifier
createStmt := "CREATE CHANGEFEED"
if cfg.queries {
pathIdent = target
} else {
// Creating the comma separated table string
// that the changefeed requires.
tablesStr := ""
for i, table := range tables {
if i > 0 {
tablesStr += ", "
}
tablesStr += table.String()
}
pathIdent = target.Schema()
createStmt += fmt.Sprintf(" FOR TABLE %s", tablesStr)
}
if cfg.webhook {
params.Set("insecure_tls_skip_verify", "true")
feedURL = url.URL{
Scheme: "webhook-https",
Host: host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
" WITH updated," +
" resolved='1s'," +
" webhook_auth_header='Bearer " + token + "'"
} else {
// No webhook_auth_header, so bake it into the query string.
// See comments in cdc.Handler.ServeHTTP checkAccess.
params.Set("access_token", token)
feedURL = url.URL{
Scheme: "experimental-http",
Host: host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
"WITH updated, resolved='1s'"
}
if cfg.diff {
createStmt += ", diff"
}
// Don't wait the entire 30s. This option was introduced in the
// same versions as webhooks.
if ok, err := supportsMinCheckpoint(sourceVersion); err == nil && ok {
createStmt += ", min_checkpoint_frequency='1s'"
}

if cfg.queries {
// CDC queries only support a single table at a time.
// Safe to just use the first table passed in.
createStmt += ",envelope='wrapped',format='json'"
createStmt += " AS SELECT pk, val"
createStmt += fmt.Sprintf(" FROM %s", tables[0].String())
}
log.Debugf("changefeed URL is %s", feedURL.String())

return createStmt
}

func TestCreateChangefeedStatement(t *testing.T) {
type args struct {
cfg *ChangefeedConfig
host string
target ident.Table
token string
tables []ident.Table
sourceVersion string
}
tests := []struct {
name string
args args
want string
}{
{
name: "basic no changefeed configs",
args: args{
cfg: &ChangefeedConfig{},
host: "localhost:8080",
target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), ident.New("tbl1")),
token: "my_token",
tables: []ident.Table{ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl1"))},
sourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1" INTO 'experimental-http://localhost:8080/target/public?access_token=my_token' WITH updated, resolved='1s', min_checkpoint_frequency='1s'`,
},
{
name: "basic webhook",
args: args{
cfg: &ChangefeedConfig{
webhook: true,
},
host: "localhost:8080",
target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), ident.New("tbl1")),
token: "my_token",
tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl1")),
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl2")),
},
sourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" INTO 'webhook-https://localhost:8080/target/public?insecure_tls_skip_verify=true' WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', min_checkpoint_frequency='1s'`,
},
{
name: "webhook and diff enabled",
args: args{
cfg: &ChangefeedConfig{
webhook: true,
diff: true,
},
host: "localhost:8080",
target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), ident.New("tbl1")),
token: "my_token",
tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl1")),
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl2")),
},
sourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" INTO 'webhook-https://localhost:8080/target/public?insecure_tls_skip_verify=true' WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', diff, min_checkpoint_frequency='1s'`,
},
{
name: "basic webhook CDC queries",
args: args{
cfg: &ChangefeedConfig{
webhook: true,
queries: true,
},
host: "localhost:8080",
target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), ident.New("tbl1")),
token: "my_token",
tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), ident.New("tbl1")),
},
sourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED INTO 'webhook-https://localhost:8080/target/public/tbl1?insecure_tls_skip_verify=true' WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', min_checkpoint_frequency='1s',envelope='wrapped',format='json' AS SELECT pk, val FROM "source"."public"."tbl1"`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := CreateChangefeedStatement(tt.args.cfg, tt.args.host, tt.args.target, tt.args.token, tt.args.tables, tt.args.sourceVersion)
require.Equal(t, tt.want, got)
})
}
}

const maxIterations = 25

func TestWorkload(t *testing.T) {
testWorkload(t)
}

func testWorkload(t *testing.T) {
log.SetLevel(log.DebugLevel)
r := require.New(t)

// Create the target and source fixtures, which will be used
// later on to generate data into the source and check that
// target rows are created properly.
targetFixture, err := all.NewFixture(t)
r.NoError(err)

sourceFixture, err := all.NewFixtureFromBase(targetFixture.Swapped())
r.NoError(err)

ctx := targetFixture.Context
targetChecker, _, err := targetFixture.NewWorkload(ctx, &all.WorkloadConfig{})
r.NoError(err)

sourceSchema := targetFixture.SourceSchema.Schema()
targetSchema := targetFixture.TargetSchema.Schema()

parent := ident.NewTable(sourceSchema, targetChecker.Parent.Name().Table())
child := ident.NewTable(sourceSchema, targetChecker.Child.Name().Table())
sourceGeneratorWorkload := workload.NewGeneratorBase(parent, child)
r.NoError(err)

// Creates the tables on the source side, so that the same table
// names exist n both source and target, a requirement for
// replication here.
sourcePool := targetFixture.SourcePool
parent = sourceGeneratorWorkload.Parent
child = sourceGeneratorWorkload.Child
parentSQL, childSQL := all.WorkloadSchema(
&all.WorkloadConfig{}, types.ProductPostgreSQL,
parent, child)
_, err = sourcePool.ExecContext(ctx, parentSQL)
r.NoError(err)
_, err = sourcePool.ExecContext(ctx, childSQL)
r.NoError(err)

// Setup test configurations.
cfg := &testConfig{webhook: true}
serverCfg, err := getConfig(cfg, sourceFixture, targetFixture.TargetPool)
r.NoError(err)

// Create the test server fixture.
// Preflight sets default values that are not set in the testConfig.
r.NoError(serverCfg.Preflight())

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
connCtx := stopper.WithContext(timeoutCtx)
testFixture, cancel, err := newTestFixture(connCtx, serverCfg)
defer cancel()
r.NoError(err)

// Insert a testing key so we can properly talk to the webhook
// in an authenticated manner.
method, priv, err := jwtAuth.InsertTestingKey(ctx, targetFixture.StagingPool, testFixture.Authenticator, targetFixture.StagingDB)
r.NoError(err)

sourceVersion := sourceFixture.SourcePool.Version
targetDB := targetSchema.Schema()
target := ident.NewTable(targetDB, targetChecker.Parent.Name().Table())
_, token, err := jwtAuth.Sign(method, priv, []ident.Schema{target.Schema(), diag.Schema})
r.NoError(err)

// Create the changefeed on the source CRDB.
tables := []ident.Table{sourceGeneratorWorkload.Parent, sourceGeneratorWorkload.Child}
host := testFixture.Listener.Addr().String()
createStmt := CreateChangefeedStatement(&ChangefeedConfig{
diff: cfg.diff,
queries: cfg.queries,
webhook: cfg.webhook,
},
host,
target,
token,
tables,
sourceVersion)
r.NoError(err)
log.Debugf("create changefeed statement is %s", createStmt)
_, err = sourcePool.ExecContext(ctx, createStmt)
r.NoError(err)

// Make this the target fixture for the accumulator. This is
// required for the data to write properly later on when
// we accumulate the batch.
acc := types.OrderedAcceptorFrom(targetFixture.ApplyAcceptor, targetFixture.Watchers)

for i := range maxIterations {
batch := &types.MultiBatch{}
sourceGeneratorWorkload.GenerateInto(batch, hlc.New(int64(i), i+1))

// Insert data on the source since it will flow from changefeeds
// to the staging DB and then to the target.
tx, err := sourceFixture.TargetPool.BeginTx(ctx, &sql.TxOptions{})
r.NoError(err)
r.NoError(acc.AcceptMultiBatch(ctx, batch, &types.AcceptOptions{TargetQuerier: tx}))
r.NoError(tx.Commit())
}

// Merge the generator values into the target checker.
// This makes it so that the target checker has all the expected
// data from the source generator workload.
targetChecker.CopyFrom(sourceGeneratorWorkload)

// Adapted this polling logic from the above test.
// This is a simpler way to determine if the rows
// were backfilled on the target.
for {
ct, err := base.GetRowCount(ctx, targetFixture.TargetPool, target)
r.NoError(err)
if ct >= 1 {
break
}
log.Debug("waiting for target rows to be written")
time.Sleep(time.Second)
}

r.True(targetChecker.CheckConsistent(ctx, t))

// We need to wait for the connection to shut down, otherwise the
// database cleanup callbacks (to drop the publication, etc.) from
// the test code above can't succeed.
connCtx.Stop(time.Minute)
<-connCtx.Done()
}
2 changes: 2 additions & 0 deletions internal/source/cdc/server/test_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/source/cdc"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/diag"
"github.com/cockroachdb/replicator/internal/util/ident"
Expand All @@ -35,6 +36,7 @@ type testFixture struct {
Authenticator types.Authenticator
Config *Config
Diagnostics *diag.Diagnostics
Handler *cdc.Handler
Listener net.Listener
Memo types.Memo
StagingPool *types.StagingPool
Expand Down
Loading

0 comments on commit 202c581

Please sign in to comment.