Skip to content

Commit

Permalink
checkpoint: Add option to limit the resolving range
Browse files Browse the repository at this point in the history
Currently, the checkpointing code always returns the latest checkpoint when
computing the resolving range. This maximizes the number of mutations that can
be concurrently applied, but it also means that if an apply transaction is
delayed, say by encountering a locked row in the target database, the amount of
temporal skew in the target data is also unbounded.

This change allows the operator to limit the number of checkpoints that will be
considered when computing the upper bound of the resolving range. Lowering this
value may have an adverse impact on total throughput.

(cherry picked from commit 40efc96)
  • Loading branch information
bobvawter committed Aug 30, 2024
1 parent a4fc2e2 commit 1dc2a70
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 9 deletions.
10 changes: 10 additions & 0 deletions internal/conveyor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type Config struct {
// Write directly to staging tables. May limit compatibility with
// schemas that contain foreign keys.
Immediate bool

// If non-zero, limit the number of checkpoint rows that will be
// used to compute the resolving range. This will limit the maximum
// amount of observable skew in the target due to blocked mutations
// (e.g. running into a lock), but will cause replication to stall
// if behind by this many checkpoints.
LimitLookahead int
}

// Bind adds configuration flags to the set.
Expand All @@ -51,6 +58,9 @@ func (c *Config) Bind(f *pflag.FlagSet) {
f.BoolVar(&c.Immediate, "immediate", false,
"bypass staging tables and write directly to target; "+
"recommended only for KV-style workloads with no FKs")
f.IntVar(&c.LimitLookahead, "limitLookahead", 0,
"limit number of checkpoints to be considered when computing the resolving range; "+
"may cause replication to stall completely if older mutations cannot be applied")
}

// Preflight ensures the Config is in a known-good state.
Expand Down
6 changes: 5 additions & 1 deletion internal/conveyor/conveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error) {
watcher: w,
}

ret.checkpoint, err = c.checkpoints.Start(c.stopper, tableGroup, &ret.resolvingRange)
var opts []checkpoint.Option
if l := c.cfg.LimitLookahead; l > 0 {
opts = append(opts, checkpoint.LimitLookahead(l))
}
ret.checkpoint, err = c.checkpoints.Start(c.stopper, tableGroup, &ret.resolvingRange, opts...)
if err != nil {
return nil, err
}
Expand Down
25 changes: 21 additions & 4 deletions internal/staging/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,19 @@ type Checkpoints struct {
// conjunction with updating the checkpoint timestamp staging table. The
// returned Group is not memoized.
func (r *Checkpoints) Start(
ctx *stopper.Context, group *types.TableGroup, bounds *notify.Var[hlc.Range],
ctx *stopper.Context, group *types.TableGroup, bounds *notify.Var[hlc.Range], options ...Option,
) (*Group, error) {
ret := r.newGroup(group, bounds)
var lookahead int
for _, opt := range options {
switch t := opt.(type) {
case limitLookahead:
lookahead = int(t)
if lookahead <= 0 {
return nil, errors.New("lookahead must be greater than zero")
}
}
}
ret := r.newGroup(group, bounds, lookahead)
// Populate data immediately.
if err := ret.refreshBounds(ctx); err != nil {
return nil, err
Expand All @@ -56,7 +66,9 @@ func (r *Checkpoints) Start(
return ret, nil
}

func (r *Checkpoints) newGroup(group *types.TableGroup, bounds *notify.Var[hlc.Range]) *Group {
func (r *Checkpoints) newGroup(
group *types.TableGroup, bounds *notify.Var[hlc.Range], lookahead int,
) *Group {
ret := &Group{
bounds: bounds,
pool: r.pool,
Expand All @@ -73,8 +85,13 @@ func (r *Checkpoints) newGroup(group *types.TableGroup, bounds *notify.Var[hlc.R
ret.metrics.proposedTime = proposedTime.With(labels)
ret.metrics.refreshDuration = refreshDuration.With(labels)

var limit string
if lookahead > 0 {
limit = fmt.Sprintf("ORDER BY source_hlc LIMIT %d", lookahead)
}
// This query may indeed require a full table scan.
ret.sql.refresh = fmt.Sprintf(refreshTemplate, r.metaTable)
ret.sql.refresh = fmt.Sprintf(refreshTemplate, r.metaTable, limit)

hinted := r.pool.HintNoFTS(r.metaTable)
ret.sql.advance = fmt.Sprintf(advanceTemplate, hinted)
ret.sql.ensure = fmt.Sprintf(ensureTemplate, hinted)
Expand Down
57 changes: 57 additions & 0 deletions internal/staging/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,62 @@ func TestResolved(t *testing.T) {
})
}

func TestLimitLookahead(t *testing.T) {
const minNanos = int64(1)
const maxNanos = int64(10)
r := require.New(t)

fixture, err := base.NewFixture(t)
r.NoError(err)

ctx := fixture.Context

chk, err := ProvideCheckpoints(ctx, fixture.StagingPool, fixture.StagingDB)
r.NoError(err)

bounds1 := &notify.Var[hlc.Range]{}
g1, err := chk.Start(ctx,
&types.TableGroup{Name: ident.New("fake")},
bounds1,
LimitLookahead(int(maxNanos/2)),
)
r.NoError(err)

part := ident.New("partition")
for i := minNanos; i <= maxNanos; i++ {
r.NoError(g1.Advance(ctx, part, hlc.New(i, 0)))
}

// Read halfway through.
r.NoError(stopvar.WaitForValue(ctx,
hlc.RangeIncluding(hlc.Zero(), hlc.New(maxNanos/2, 0)),
bounds1,
))

// Update base range
r.NoError(g1.Commit(ctx, hlc.RangeIncluding(hlc.Zero(), hlc.New(1, 0))))

// Check boundary condition of marking first timestamp.
r.NoError(stopvar.WaitForValue(ctx,
hlc.RangeIncluding(hlc.New(1, 0), hlc.New(maxNanos/2, 0)),
bounds1,
))

// Verify that max does advance.
r.NoError(g1.Commit(ctx, hlc.RangeIncluding(hlc.Zero(), hlc.New(2, 0))))
r.NoError(stopvar.WaitForValue(ctx,
hlc.RangeIncluding(hlc.New(2, 0), hlc.New(maxNanos/2+1, 0)),
bounds1,
))

// Verify all resolved.
r.NoError(g1.Commit(ctx, hlc.RangeIncluding(hlc.Zero(), hlc.New(maxNanos, 0))))
r.NoError(stopvar.WaitForValue(ctx,
hlc.RangeIncluding(hlc.New(maxNanos, 0), hlc.New(maxNanos, 0)),
bounds1,
))
}

func TestTransitionsInSinglePartition(t *testing.T) {
testTransitions(t, 1)
}
Expand Down Expand Up @@ -172,6 +228,7 @@ func testTransitions(t *testing.T, partitionCount int) {
Enclosing: fixture.TargetSchema.Schema(),
},
notify.VarOf(hlc.RangeEmpty()),
1024,
)

expect := func(low, high int) {
Expand Down
3 changes: 2 additions & 1 deletion internal/staging/checkpoint/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *Group) TableGroup() *types.TableGroup {
// - partition_max_times: Determines the latest known checkpoint for
// each partition within the group.
// - visible_data: Restricts available_data by the minimum-maximum
// value from p_max_times.
// value from p_max_times and, potentially, the lookahead limit.
// - partition_max_unapplied: Finds the last checkpoint time that
// hasn't been processed.
// - last_applied: Finds the latest applied timestamp within
Expand Down Expand Up @@ -268,6 +268,7 @@ visible_data AS (
SELECT *
FROM available_data
WHERE source_hlc <= (SELECT min(hlc) FROM partition_max_times)
%[2]s
),
partition_max_unapplied AS (
SELECT partition, max(source_hlc) AS hlc
Expand Down
9 changes: 6 additions & 3 deletions internal/staging/checkpoint/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ INSERT INTO %s (target_schema, source_nanos, source_logical, target_applied_at)
"quux.public": hlc.RangeIncluding(hlc.New(900, 9), hlc.New(900, 9)),
}
for id, expect := range expected {
rng, err := chk.newGroup(&types.TableGroup{
Name: ident.New(id),
}, notify.VarOf(hlc.RangeEmpty()),
rng, err := chk.newGroup(
&types.TableGroup{
Name: ident.New(id),
},
notify.VarOf(hlc.RangeEmpty()),
1024,
).refreshQuery(ctx, hlc.Zero())
r.NoError(err)
r.Equal(expect, rng)
Expand Down
32 changes: 32 additions & 0 deletions internal/staging/checkpoint/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package checkpoint

// An Option to [Checkpoints.Start].
type Option interface {
isOption()
}

type limitLookahead int

// LimitLookahead limits the number of resolved timestamps that are used
// to calculate the extent of the resolving range.
func LimitLookahead(limit int) Option {
return limitLookahead(limit)
}

func (l limitLookahead) isOption() {}

0 comments on commit 1dc2a70

Please sign in to comment.