Skip to content

Commit

Permalink
Replace event_tag FK to get rid of insert and return #710 (#731)
Browse files Browse the repository at this point in the history
* Replace event_tag FK to get rid of insert and return #710

* support rolling updates #710

* remove CRLF #710

* optimized migrator #710

* fixes oracle test #710

* unitTest,SQL for migration #710

* fix MigratorSpec #710

* chore: typo fix #710

* fix: IntegrationTest and clean code #710

* fix: compatible legacy tag read #673

* chore: mi-ma filter for PR

* fix: optimized migrate step

* fix: dialect for column fill

* fix: update migration sql

* fix: mysql dialect

* fix: dialect syntax

* fix: dialect syntax

* fix: avoid use system table of mysql

* fix: batch insert caused flaky test

* fix: insert less event of large batch

* fix: script fix and strongly express two-step update
  • Loading branch information
Roiocam authored Oct 27, 2023
1 parent 408b560 commit d4d942e
Show file tree
Hide file tree
Showing 30 changed files with 670 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.integration

import akka.persistence.jdbc.query.{
EventsByTagMigrationTest,
MysqlCleaner,
OracleCleaner,
PostgresCleaner,
SqlServerCleaner
}

class PostgresScalaEventsByTagMigrationTest
extends EventsByTagMigrationTest("postgres-application.conf")
with PostgresCleaner {}

class MySQLScalaEventByTagMigrationTest extends EventsByTagMigrationTest("mysql-application.conf") with MysqlCleaner {

override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintType = "FOREIGN KEY", constraintDialect = "FOREIGN KEY")

override def dropLegacyPKConstraint(): Unit =
dropConstraint(constraintType = "PRIMARY KEY", constraintDialect = "", constraintNameDialect = "KEY")

override def addNewPKConstraint(): Unit =
addPKConstraint(constraintNameDialect = "")

override def addNewFKConstraint(): Unit =
addFKConstraint()

override def migrateLegacyRows(): Unit =
fillNewColumn(
joinDialect = joinSQL,
pidSetDialect =
s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.persistenceId} = ${journalTableName}.${journalTableCfg.columnNames.persistenceId}",
seqNrSetDialect =
s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.sequenceNumber} = ${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}")
}

class OracleScalaEventByTagMigrationTest
extends EventsByTagMigrationTest("oracle-application.conf")
with OracleCleaner {

override def addNewColumn(): Unit = {
// mock event_id not null, in order to change it to null later
alterColumn(alterDialect = "MODIFY", changeToDialect = "NOT NULL")
}

override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "R")

override def dropLegacyPKConstraint(): Unit =
dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "P")

override def migrateLegacyRows(): Unit =
withStatement { stmt =>
stmt.execute(s"""UPDATE ${tagTableCfg.tableName}
|SET (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber}) = (
| SELECT ${journalTableCfg.columnNames.persistenceId}, ${journalTableCfg.columnNames.sequenceNumber}
| ${fromSQL}
|)
|WHERE EXISTS (
| SELECT 1
| ${fromSQL}
|)""".stripMargin)
}
}

class SqlServerScalaEventByTagMigrationTest
extends EventsByTagMigrationTest("sqlserver-application.conf")
with SqlServerCleaner {

override def addNewPKConstraint(): Unit = {
// Change new column not null
alterColumn(columnName = tagTableCfg.columnNames.persistenceId, changeToDialect = "NVARCHAR(255) NOT NULL")
alterColumn(columnName = tagTableCfg.columnNames.sequenceNumber, changeToDialect = "NUMERIC(10,0) NOT NULL")
super.addNewPKConstraint()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.persistence.jdbc.testkit.internal.Oracle
import akka.persistence.jdbc.testkit.internal.Postgres
import akka.persistence.jdbc.testkit.internal.SqlServer
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory

class PostgresJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("postgres-application.conf"), Postgres) {
override def eventsCount: Int = 100
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.JournalTables#EventTags.eventId")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.eventId")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$1")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.JournalTables$TagRow$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.unapply")
25 changes: 16 additions & 9 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ akka-persistence-jdbc {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -138,9 +138,16 @@ jdbc-journal {
schemaName = ""

columnNames {
# use for older foreign key.
eventId = "event_id"
persistenceId = "persistence_id"
sequenceNumber = "sequence_number"
tag = "tag"
}

# For rolling updates the event_tag table migration.
# switch those to enable new region key write and read.
legacy-tag-key = true
}

# Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
Expand All @@ -159,7 +166,7 @@ jdbc-journal {

# The size of the buffer used when queueing up events for batch writing. This number must be bigger then the number
# of events that may be written concurrently. In other words this number must be bigger than the number of persistent
# actors that are actively peristing at the same time.
# actors that are actively persisting at the same time.
bufferSize = 1000
# The maximum size of the batches in which journal rows will be inserted
batchSize = 400
Expand Down Expand Up @@ -213,7 +220,7 @@ jdbc-journal {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -339,7 +346,7 @@ jdbc-snapshot-store {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -433,7 +440,7 @@ jdbc-read-journal {
journal-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# In case a number in the sequence is missing, this is the amount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
Expand Down Expand Up @@ -493,7 +500,7 @@ jdbc-read-journal {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -563,11 +570,11 @@ jdbc-durable-state-store {
}
}

# Settings for determining if gloabal_offset column in the durable-state are out of sequence.
# Settings for determining if global_offset column in the durable-state are out of sequence.
durable-state-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# In case a number in the sequence is missing, this is the amount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
Expand Down Expand Up @@ -618,7 +625,7 @@ jdbc-durable-state-store {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ CREATE TABLE IF NOT EXISTS "event_journal" (
CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");

CREATE TABLE IF NOT EXISTS "event_tag" (
"event_id" BIGINT NOT NULL,
"event_id" BIGINT,
"persistence_id" VARCHAR(255),
"sequence_number" BIGINT,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
PRIMARY KEY("persistence_id", "sequence_number", "tag"),
CONSTRAINT fk_event_journal
FOREIGN KEY("event_id")
REFERENCES "event_journal"("ordering")
FOREIGN KEY("persistence_id", "sequence_number")
REFERENCES "event_journal"("persistence_id", "sequence_number")
ON DELETE CASCADE
);

Expand Down
12 changes: 7 additions & 5 deletions core/src/main/resources/schema/mysql/mysql-create-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS event_journal(
CREATE TABLE IF NOT EXISTS event_journal (
ordering SERIAL,
deleted BOOLEAN DEFAULT false NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
Expand All @@ -17,11 +17,13 @@ CREATE TABLE IF NOT EXISTS event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE IF NOT EXISTS event_tag (
event_id BIGINT UNSIGNED NOT NULL,
event_id BIGINT UNSIGNED,
persistence_id VARCHAR(255),
sequence_number BIGINT,
tag VARCHAR(255) NOT NULL,
PRIMARY KEY(event_id, tag),
FOREIGN KEY (event_id)
REFERENCES event_journal(ordering)
PRIMARY KEY(persistence_id, sequence_number, tag),
FOREIGN KEY (persistence_id, sequence_number)
REFERENCES event_journal(persistence_id, sequence_number)
ON DELETE CASCADE
);

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- **************** first step ****************
-- add new column
ALTER TABLE event_tag
ADD persistence_id VARCHAR(255),
ADD sequence_number BIGINT;
-- **************** second step ****************
-- migrate rows
UPDATE event_tag
INNER JOIN event_journal ON event_tag.event_id = event_journal.ordering
SET event_tag.persistence_id = event_journal.persistence_id,
event_tag.sequence_number = event_journal.sequence_number;
-- drop old FK constraint
SELECT CONSTRAINT_NAME
INTO @fk_constraint_name
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS
WHERE TABLE_NAME = 'event_tag';
SET @alter_query = CONCAT('ALTER TABLE event_tag DROP FOREIGN KEY ', @fk_constraint_name);
PREPARE stmt FROM @alter_query;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- drop old PK constraint
ALTER TABLE event_tag
DROP PRIMARY KEY;
-- create new PK constraint for PK column.
ALTER TABLE event_tag
ADD CONSTRAINT
PRIMARY KEY (persistence_id, sequence_number, tag);
-- create new FK constraint for PK column.
ALTER TABLE event_tag
ADD CONSTRAINT fk_event_journal_on_pk
FOREIGN KEY (persistence_id, sequence_number)
REFERENCES event_journal (persistence_id, sequence_number)
ON DELETE CASCADE;
-- alter the event_id to nullable, so we can skip the InsertAndReturn.
ALTER TABLE event_tag
MODIFY COLUMN event_id BIGINT UNSIGNED NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOU
/

CREATE TABLE EVENT_TAG (
EVENT_ID NUMERIC NOT NULL,
EVENT_ID NUMERIC,
PERSISTENCE_ID VARCHAR(255),
SEQUENCE_NUMBER NUMERIC,
TAG VARCHAR(255) NOT NULL,
PRIMARY KEY(EVENT_ID, TAG),
FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING)
PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG),
FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) REFERENCES EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER)
ON DELETE CASCADE
)
/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- **************** first step ****************
-- add new column
ALTER TABLE EVENT_TAG
ADD (PERSISTENCE_ID VARCHAR2(255),
SEQUENCE_NUMBER NUMERIC);
-- **************** second step ****************
-- migrate rows
UPDATE EVENT_TAG
SET PERSISTENCE_ID = (SELECT PERSISTENCE_ID
FROM EVENT_JOURNAL
WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING),
SEQUENCE_NUMBER = (SELECT SEQUENCE_NUMBER
FROM EVENT_JOURNAL
WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING)
-- drop old FK constraint
DECLARE
v_constraint_name VARCHAR2(255);
BEGIN
SELECT CONSTRAINT_NAME
INTO v_constraint_name
FROM USER_CONSTRAINTS
WHERE TABLE_NAME = 'EVENT_TAG'
AND CONSTRAINT_TYPE = 'R';

IF v_constraint_name IS NOT NULL THEN
EXECUTE IMMEDIATE 'ALTER TABLE EVENT_TAG DROP CONSTRAINT ' || v_constraint_name;
END IF;

COMMIT;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
RAISE;
END;
/

-- drop old PK constraint
ALTER TABLE EVENT_TAG
DROP PRIMARY KEY;
-- create new PK constraint for PK column.
ALTER TABLE EVENT_TAG
ADD CONSTRAINT "pk_event_tag"
PRIMARY KEY (PERSISTENCE_ID, SEQUENCE_NUMBER, TAG);
-- create new FK constraint for PK column.
ALTER TABLE EVENT_TAG
ADD CONSTRAINT fk_EVENT_JOURNAL_on_pk
FOREIGN KEY (PERSISTENCE_ID, SEQUENCE_NUMBER)
REFERENCES EVENT_JOURNAL (PERSISTENCE_ID, SEQUENCE_NUMBER)
ON DELETE CASCADE;
-- alter the EVENT_ID to nullable, so we can skip the InsertAndReturn.
ALTER TABLE EVENT_TAG
MODIFY EVENT_ID NULL;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS public.event_journal(
CREATE TABLE IF NOT EXISTS public.event_journal (
ordering BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
Expand All @@ -23,11 +23,13 @@ CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering)

CREATE TABLE IF NOT EXISTS public.event_tag(
event_id BIGINT,
persistence_id VARCHAR(255),
sequence_number BIGINT,
tag VARCHAR(256),
PRIMARY KEY(event_id, tag),
PRIMARY KEY(persistence_id, sequence_number, tag),
CONSTRAINT fk_event_journal
FOREIGN KEY(event_id)
REFERENCES event_journal(ordering)
FOREIGN KEY(persistence_id, sequence_number)
REFERENCES event_journal(persistence_id, sequence_number)
ON DELETE CASCADE
);

Expand Down
Loading

0 comments on commit d4d942e

Please sign in to comment.