Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context local storage SPI #5048

Merged
merged 19 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,21 @@
</additionalClasspathElements>
</configuration>
</execution>
<execution>
<id>custom-context-local</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/CustomContextLocalTest.java</include>
</includes>
<additionalClasspathElements>
<additionalClasspathElement>${project.basedir}/src/test/classpath/customcontextlocal</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
59 changes: 59 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Supplier;

/**
* Base class for context.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class ContextBase {

final Object[] locals;

ContextBase(Object[] locals) {
this.locals = locals;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
Object res = accessMode.get(locals, index);
return (T) res;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException("Invalid key index: " + index);
}
Object res = accessMode.getOrCreate(locals, index, (Supplier<Object>) initialValueSupplier);
return (T) res;
}

public final <T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
accessMode.put(locals, index, value);
}
}
15 changes: 4 additions & 11 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author <a href="http://tfox.org">Tim Fox</a>
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public final class ContextImpl implements ContextInternal {
public final class ContextImpl extends ContextBase implements ContextInternal {

private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);

Expand All @@ -44,14 +44,14 @@ public final class ContextImpl implements ContextInternal {
private final EventLoop eventLoop;
private final EventExecutor executor;
private ConcurrentMap<Object, Object> data;
private ConcurrentMap<Object, Object> localData;
private volatile Handler<Throwable> exceptionHandler;
final TaskQueue internalOrderedTasks;
final WorkerPool internalWorkerPool;
final WorkerPool workerPool;
final TaskQueue orderedTasks;

protected ContextImpl(VertxInternal vertx,
Object[] locals,
ThreadingModel threadingModel,
EventLoop eventLoop,
EventExecutor executor,
Expand All @@ -61,6 +61,7 @@ protected ContextImpl(VertxInternal vertx,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
super(locals);
this.threadingModel = threadingModel;
this.deployment = deployment;
this.config = deployment != null ? deployment.config() : new JsonObject();
Expand Down Expand Up @@ -211,14 +212,6 @@ public synchronized ConcurrentMap<Object, Object> contextData() {
return data;
}

@Override
public synchronized ConcurrentMap<Object, Object> localContextData() {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}

public void reportException(Throwable t) {
Handler<Throwable> handler = exceptionHandler;
if (handler == null) {
Expand Down Expand Up @@ -306,6 +299,6 @@ protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(this);
return new DuplicatedContext(this, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}
}
71 changes: 66 additions & 5 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

import io.netty.channel.EventLoop;
import io.vertx.core.*;
import io.vertx.core.Future;
import io.vertx.core.impl.future.FailedFuture;
import io.vertx.core.impl.future.PromiseImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Supplier;

/**
* This interface provides an api for vert.x core internal use only
Expand All @@ -33,6 +34,8 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);

/**
* @return the current context
*/
Expand Down Expand Up @@ -304,19 +307,76 @@ default boolean remove(Object key) {
/**
* @return the {@link ConcurrentMap} used to store local context data
*/
ConcurrentMap<Object, Object> localContextData();
default ConcurrentMap<Object, Object> localContextData() {
return LOCAL_MAP.get(this, ConcurrentHashMap::new);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
default <T> T getLocal(ContextLocal<T> key) {
return getLocal(key, AccessMode.CONCURRENT);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode);

/**
* Get some local data from the context, when it does not exist the {@code initialValueSupplier} is called to obtain
* the initial value.
*
* <p> The {@code initialValueSupplier} might be called multiple times when multiple threads call this method concurrently.
*
* @param key the key of the data
* @param initialValueSupplier the supplier of the initial value optionally called
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier);

/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
<T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value);

/**
* Remove some local data from the context.
*
* @param key the key to remove
*/
default <T> void removeLocal(ContextLocal<T> key, AccessMode accessMode) {
putLocal(key, accessMode, null);
}

@Deprecated
@SuppressWarnings("unchecked")
@Override
default <T> T getLocal(Object key) {
return (T) localContextData().get(key);
}

@Deprecated
@Override
default void putLocal(Object key, Object value) {
localContextData().put(key, value);
}

@Deprecated
@Override
default boolean removeLocal(Object key) {
return localContextData().remove(key) != null;
Expand Down Expand Up @@ -445,4 +505,5 @@ default ContextInternal unwrap() {
default boolean isDuplicate() {
return false;
}

}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.ContextLocal;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

final int index;

public ContextLocalImpl(int index) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
}
}
20 changes: 5 additions & 15 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class DuplicatedContext implements ContextInternal {
final class DuplicatedContext extends ContextBase implements ContextInternal {

protected final ContextImpl delegate;
private ConcurrentMap<Object, Object> localData;
final ContextImpl delegate;

DuplicatedContext(ContextImpl delegate) {
DuplicatedContext(ContextImpl delegate, Object[] locals) {
super(locals);
this.delegate = delegate;
}

Expand Down Expand Up @@ -119,16 +119,6 @@ public final ConcurrentMap<Object, Object> contextData() {
return delegate.contextData();
}

@Override
public final ConcurrentMap<Object, Object> localContextData() {
synchronized (this) {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}
}

@Override
public <T> Future<T> executeBlockingInternal(Callable<T> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks);
Expand Down Expand Up @@ -176,7 +166,7 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate);
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}

@Override
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
}

static int next() {
return seq.getAndIncrement();
}
}
Loading
Loading