From 221fb107ec155fb664a2422f755f35e0fbabc0d0 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 10 Jan 2025 14:10:17 +0100 Subject: [PATCH] Support duplicate context duplication. Motivation: Duplicating a duplicated context is supported but the duplication semantic is not defined. Changes: This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null. --- .../java/io/vertx/core/impl/ContextImpl.java | 6 ++--- .../io/vertx/core/impl/ContextLocalImpl.java | 19 ++++++++++---- .../io/vertx/core/impl/DuplicatedContext.java | 4 ++- .../java/io/vertx/core/impl/LocalSeq.java | 25 +++++++++++------- .../java/io/vertx/core/impl/VertxImpl.java | 26 ++++++++++++++++--- .../vertx/core/internal/ContextInternal.java | 2 +- .../io/vertx/core/internal/VertxInternal.java | 7 +++++ .../io/vertx/core/internal/VertxWrapper.java | 8 +++++- .../spi/context/storage/ContextLocal.java | 12 ++++++++- .../io/vertx/tests/context/ContextTest.java | 23 ++++++++++++++++ 10 files changed, 108 insertions(+), 24 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java index 91e76096ba6..8856a1466e1 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -38,7 +38,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal { static final boolean DISABLE_TIMINGS = SysProps.DISABLE_CONTEXT_TIMINGS.getBoolean(); - private final VertxInternal owner; + private final VertxImpl owner; private final JsonObject config; private final DeploymentContext deployment; private final CloseFuture closeFuture; @@ -51,7 +51,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal { final WorkerPool workerPool; final WorkerTaskQueue executeBlockingTasks; - public ContextImpl(VertxInternal vertx, + public ContextImpl(VertxImpl vertx, Object[] locals, EventLoopExecutor eventLoop, ThreadingModel threadingModel, @@ -113,7 +113,7 @@ public EventLoop nettyEventLoop() { return eventLoop.eventLoop; } - public VertxInternal owner() { + public VertxImpl owner() { return owner; } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java index 02671c9a58c..2eb30e3273d 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java @@ -12,18 +12,27 @@ import io.vertx.core.spi.context.storage.ContextLocal; +import java.util.function.Function; + /** * @author Julien Viet */ public class ContextLocalImpl implements ContextLocal { + public static ContextLocal create(Class type, Function duplicator) { + synchronized (LocalSeq.class) { + int idx = LocalSeq.locals.size(); + ContextLocal local = new ContextLocalImpl<>(idx, duplicator); + LocalSeq.locals.add(local); + return local; + } + } + final int index; + final Function duplicator; - public ContextLocalImpl(int index) { + public ContextLocalImpl(int index, Function duplicator) { this.index = index; - } - - public ContextLocalImpl() { - this.index = LocalSeq.next(); + this.duplicator = duplicator; } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java index dc2a4097d17..9c4ac2d6d9f 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -142,7 +142,9 @@ public boolean isWorkerContext() { @Override public ContextInternal duplicate() { - return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]); + DuplicatedContext duplicate = new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]); + delegate.owner().duplicate(this, duplicate); + return duplicate; } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java b/vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java index 2089a3fd37b..17eff10fa25 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java @@ -10,6 +10,11 @@ */ package io.vertx.core.impl; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.spi.context.storage.ContextLocal; + +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** @@ -18,20 +23,22 @@ public class LocalSeq { // 0 : reserved slot for local context map - private static final AtomicInteger seq = new AtomicInteger(1); + static final List> locals = new ArrayList<>(); + + static { + reset(); + } /** * Hook for testing purposes */ - public static void reset() { - seq.set((1)); - } - - static int get() { - return seq.get(); + public synchronized static void reset() { + // 0 : reserved slot for local context map + locals.clear(); + locals.add(ContextInternal.LOCAL_MAP); } - static int next() { - return seq.getAndIncrement(); + synchronized static ContextLocal[] get() { + return locals.toArray(new ContextLocal[0]); } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 5535cdaa67c..b25c70f7865 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -45,6 +45,8 @@ import io.vertx.core.net.impl.*; import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.spi.context.executor.EventExecutorProvider; +import io.vertx.core.spi.context.storage.AccessMode; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.file.FileResolver; import io.vertx.core.file.impl.FileSystemImpl; import io.vertx.core.file.impl.WindowsFileSystem; @@ -143,7 +145,8 @@ private static ThreadFactory virtualThreadFactory() { private final FileResolver fileResolver; private final EventExecutorProvider eventExecutorProvider; private final Map sharedNetServers = new HashMap<>(); - private final int contextLocals; + private final ContextLocal[] contextLocals; + private final List> contextLocalsList; final WorkerPool workerPool; final WorkerPool internalWorkerPool; final WorkerPool virtualThreaWorkerPool; @@ -202,6 +205,7 @@ private static ThreadFactory virtualThreadFactory() { ThreadFactory virtualThreadFactory = virtualThreadFactory(); contextLocals = LocalSeq.get(); + contextLocalsList = Collections.unmodifiableList(Arrays.asList(contextLocals)); closeFuture = new CloseFuture(log); maxEventLoopExecTime = maxEventLoopExecuteTime; maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit; @@ -563,10 +567,10 @@ public boolean cancelTimer(long id) { } private Object[] createContextLocals() { - if (contextLocals == 0) { + if (contextLocals.length == 0) { return EMPTY_CONTEXT_LOCALS; } else { - return new Object[contextLocals]; + return new Object[contextLocals.length]; } } @@ -936,6 +940,11 @@ public AddressResolverGroup nettyAddressResolverGroup() { return hostnameResolver.nettyAddressResolverGroup(); } + @Override + public List> contextLocals() { + return contextLocalsList; + } + @Override public FileResolver fileResolver() { return fileResolver; @@ -1317,6 +1326,17 @@ public C createSharedResource(String resourceKey, String resourceName, Close return SharedResourceHolder.createSharedResource(this, resourceKey, resourceName, closeFuture, supplier); } + void duplicate(ContextBase src, ContextBase dst) { + for (int i = 0;i < contextLocals.length;i++) { + ContextLocalImpl contextLocal = (ContextLocalImpl) contextLocals[i]; + Object local = AccessMode.CONCURRENT.get(src.locals, i); + if (local != null) { + local = ((Function)contextLocal.duplicator).apply(local); + } + AccessMode.CONCURRENT.put(dst.locals, i, local); + } + } + /** * Reads the version from the {@code vertx-version.txt} file. * diff --git a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java index 7d85f97b88c..0769169878d 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java @@ -36,7 +36,7 @@ */ public interface ContextInternal extends Context { - ContextLocal> LOCAL_MAP = new ContextLocalImpl<>(0); + ContextLocal> LOCAL_MAP = new ContextLocalImpl<>(0, ConcurrentHashMap::new); /** * @return the current context diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index 9ec4f47c3fe..52418b913da 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -23,6 +23,7 @@ import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.impl.NetServerInternal; import io.vertx.core.net.impl.ServerID; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.transport.Transport; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.file.FileResolver; @@ -33,6 +34,7 @@ import java.lang.ref.Cleaner; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -305,6 +307,11 @@ default Future executeBlockingInternal(Callable blockingCodeHandler) { */ AddressResolverGroup nettyAddressResolverGroup(); + /** + * @return an immutable list of this vertx instance context locals + */ + List> contextLocals(); + BlockedThreadChecker blockedThreadChecker(); CloseFuture closeFuture(); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index 83070c6a652..7ff9628959c 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -30,6 +30,7 @@ import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.impl.NetServerInternal; import io.vertx.core.net.impl.ServerID; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.transport.Transport; import io.vertx.core.shareddata.SharedData; import io.vertx.core.spi.VerticleFactory; @@ -42,9 +43,9 @@ import java.lang.ref.Cleaner; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -385,6 +386,11 @@ public AddressResolverGroup nettyAddressResolverGroup() { return delegate.nettyAddressResolverGroup(); } + @Override + public List> contextLocals() { + return delegate.contextLocals(); + } + @Override public BlockedThreadChecker blockedThreadChecker() { return delegate.blockedThreadChecker(); diff --git a/vertx-core/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java b/vertx-core/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java index 6bd5fc64755..fcac862f993 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java @@ -14,6 +14,7 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.impl.ContextLocalImpl; +import java.util.function.Function; import java.util.function.Supplier; /** @@ -35,7 +36,16 @@ public interface ContextLocal { * @return the context local storage */ static ContextLocal registerLocal(Class type) { - return new ContextLocalImpl<>(); + return ContextLocalImpl.create(type, Function.identity()); + } + + /** + * Registers a context local storage. + * + * @return the context local storage + */ + static ContextLocal registerLocal(Class type, Function duplicator) { + return ContextLocalImpl.create(type, duplicator); } /** diff --git a/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java b/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java index 63ee4551704..874e60eac1f 100644 --- a/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java @@ -1204,4 +1204,27 @@ public void testInterruptTask(ContextInternal context, Consumer actor) assertTrue((System.currentTimeMillis() - now) < 2000); assertTrue(interrupted.get()); } + + @Test + public void testNestedDuplicate() { + ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate(); + ctx.putLocal("foo", "bar"); + Object expected = new Object(); + ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected); + ContextInternal duplicate = ctx.duplicate(); + assertEquals("bar", duplicate.getLocal("foo")); + assertEquals(expected, duplicate.getLocal(contextLocal)); + ctx.removeLocal("foo"); + ctx.removeLocal(contextLocal, AccessMode.CONCURRENT); + assertEquals("bar", duplicate.getLocal("foo")); + assertEquals(expected, duplicate.getLocal(contextLocal)); + } + + @Test + public void testContextLocals() { + List> locals = ((VertxInternal) vertx).contextLocals(); + assertSame(ContextInternal.LOCAL_MAP, locals.get(0)); + assertSame(contextLocal, locals.get(1)); + assertSame(locals, ((VertxInternal) vertx).contextLocals()); + } }