Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support duplicated context duplication #5215

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {

static final boolean DISABLE_TIMINGS = SysProps.DISABLE_CONTEXT_TIMINGS.getBoolean();

private final VertxInternal owner;
private final VertxImpl owner;
private final JsonObject config;
private final DeploymentContext deployment;
private final CloseFuture closeFuture;
Expand All @@ -51,7 +51,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
final WorkerPool workerPool;
final WorkerTaskQueue executeBlockingTasks;

public ContextImpl(VertxInternal vertx,
public ContextImpl(VertxImpl vertx,
Object[] locals,
EventLoopExecutor eventLoop,
ThreadingModel threadingModel,
Expand Down Expand Up @@ -113,7 +113,7 @@ public EventLoop nettyEventLoop() {
return eventLoop.eventLoop;
}

public VertxInternal owner() {
public VertxImpl owner() {
return owner;
}

Expand Down
19 changes: 14 additions & 5 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,27 @@

import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Function;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

public static <T> ContextLocal<T> create(Class<T> type, Function<T, T> duplicator) {
synchronized (LocalSeq.class) {
int idx = LocalSeq.locals.size();
ContextLocal<T> local = new ContextLocalImpl<>(idx, duplicator);
LocalSeq.locals.add(local);
return local;
}
}

final int index;
final Function<T, T> duplicator;

public ContextLocalImpl(int index) {
public ContextLocalImpl(int index, Function<T, T> duplicator) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
this.duplicator = duplicator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
DuplicatedContext duplicate = new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
delegate.owner().duplicate(this, duplicate);
return duplicate;
}

@Override
Expand Down
25 changes: 16 additions & 9 deletions vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
*/
package io.vertx.core.impl;

import io.vertx.core.internal.ContextInternal;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -18,20 +23,22 @@
public class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);
static final List<ContextLocal<?>> locals = new ArrayList<>();

static {
reset();
}

/**
* Hook for testing purposes
*/
public static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
public synchronized static void reset() {
// 0 : reserved slot for local context map
locals.clear();
locals.add(ContextInternal.LOCAL_MAP);
}

static int next() {
return seq.getAndIncrement();
synchronized static ContextLocal<?>[] get() {
return locals.toArray(new ContextLocal[0]);
}
}
26 changes: 23 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.vertx.core.net.impl.*;
import io.vertx.core.impl.transports.NioTransport;
import io.vertx.core.spi.context.executor.EventExecutorProvider;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
Expand Down Expand Up @@ -143,7 +145,8 @@ private static ThreadFactory virtualThreadFactory() {
private final FileResolver fileResolver;
private final EventExecutorProvider eventExecutorProvider;
private final Map<ServerID, NetServerInternal> sharedNetServers = new HashMap<>();
private final int contextLocals;
private final ContextLocal<?>[] contextLocals;
private final List<ContextLocal<?>> contextLocalsList;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -202,6 +205,7 @@ private static ThreadFactory virtualThreadFactory() {
ThreadFactory virtualThreadFactory = virtualThreadFactory();

contextLocals = LocalSeq.get();
contextLocalsList = Collections.unmodifiableList(Arrays.asList(contextLocals));
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
Expand Down Expand Up @@ -563,10 +567,10 @@ public boolean cancelTimer(long id) {
}

private Object[] createContextLocals() {
if (contextLocals == 0) {
if (contextLocals.length == 0) {
return EMPTY_CONTEXT_LOCALS;
} else {
return new Object[contextLocals];
return new Object[contextLocals.length];
}
}

Expand Down Expand Up @@ -936,6 +940,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
return hostnameResolver.nettyAddressResolverGroup();
}

@Override
public List<ContextLocal<?>> contextLocals() {
return contextLocalsList;
}

@Override
public FileResolver fileResolver() {
return fileResolver;
Expand Down Expand Up @@ -1317,6 +1326,17 @@ public <C> C createSharedResource(String resourceKey, String resourceName, Close
return SharedResourceHolder.createSharedResource(this, resourceKey, resourceName, closeFuture, supplier);
}

void duplicate(ContextBase src, ContextBase dst) {
for (int i = 0;i < contextLocals.length;i++) {
ContextLocalImpl<?> contextLocal = (ContextLocalImpl<?>) contextLocals[i];
Object local = AccessMode.CONCURRENT.get(src.locals, i);
if (local != null) {
local = ((Function)contextLocal.duplicator).apply(local);
}
AccessMode.CONCURRENT.put(dst.locals, i, local);
}
}

/**
* Reads the version from the {@code vertx-version.txt} file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0, ConcurrentHashMap::new);

/**
* @return the current context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.transport.Transport;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.file.FileResolver;
Expand All @@ -33,6 +34,7 @@
import java.lang.ref.Cleaner;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -305,6 +307,11 @@ default <T> Future<T> executeBlockingInternal(Callable<T> blockingCodeHandler) {
*/
AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup();

/**
* @return an immutable list of this vertx instance context locals
*/
List<ContextLocal<?>> contextLocals();

BlockedThreadChecker blockedThreadChecker();

CloseFuture closeFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.VerticleFactory;
Expand All @@ -42,9 +43,9 @@
import java.lang.ref.Cleaner;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -385,6 +386,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
return delegate.nettyAddressResolverGroup();
}

@Override
public List<ContextLocal<?>> contextLocals() {
return delegate.contextLocals();
}

@Override
public BlockedThreadChecker blockedThreadChecker() {
return delegate.blockedThreadChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;

import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -35,7 +36,16 @@ public interface ContextLocal<T> {
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type) {
return new ContextLocalImpl<>();
return ContextLocalImpl.create(type, Function.identity());
}

/**
* Registers a context local storage.
*
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type, Function<T, T> duplicator) {
return ContextLocalImpl.create(type, duplicator);
}

/**
Expand Down
23 changes: 23 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1204,4 +1204,27 @@ public void testInterruptTask(ContextInternal context, Consumer<Runnable> actor)
assertTrue((System.currentTimeMillis() - now) < 2000);
assertTrue(interrupted.get());
}

@Test
public void testNestedDuplicate() {
ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate();
ctx.putLocal("foo", "bar");
Object expected = new Object();
ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected);
ContextInternal duplicate = ctx.duplicate();
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
ctx.removeLocal("foo");
ctx.removeLocal(contextLocal, AccessMode.CONCURRENT);
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
}

@Test
public void testContextLocals() {
List<ContextLocal<?>> locals = ((VertxInternal) vertx).contextLocals();
assertSame(ContextInternal.LOCAL_MAP, locals.get(0));
assertSame(contextLocal, locals.get(1));
assertSame(locals, ((VertxInternal) vertx).contextLocals());
}
}
Loading