Skip to content

Commit

Permalink
Merge pull request #5048 from eclipse-vertx/context-local-data-rework
Browse files Browse the repository at this point in the history
Context local storage SPI
  • Loading branch information
vietj authored Mar 1, 2024
2 parents 582d864 + af9fa82 commit 8de9656
Show file tree
Hide file tree
Showing 21 changed files with 613 additions and 113 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,21 @@
</additionalClasspathElements>
</configuration>
</execution>
<execution>
<id>custom-context-local</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/CustomContextLocalTest.java</include>
</includes>
<additionalClasspathElements>
<additionalClasspathElement>${project.basedir}/src/test/classpath/customcontextlocal</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
59 changes: 59 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Supplier;

/**
* Base class for context.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class ContextBase {

final Object[] locals;

ContextBase(Object[] locals) {
this.locals = locals;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
Object res = accessMode.get(locals, index);
return (T) res;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException("Invalid key index: " + index);
}
Object res = accessMode.getOrCreate(locals, index, (Supplier<Object>) initialValueSupplier);
return (T) res;
}

public final <T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
accessMode.put(locals, index, value);
}
}
15 changes: 4 additions & 11 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author <a href="http://tfox.org">Tim Fox</a>
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public final class ContextImpl implements ContextInternal {
public final class ContextImpl extends ContextBase implements ContextInternal {

private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);

Expand All @@ -44,14 +44,14 @@ public final class ContextImpl implements ContextInternal {
private final EventLoop eventLoop;
private final EventExecutor executor;
private ConcurrentMap<Object, Object> data;
private ConcurrentMap<Object, Object> localData;
private volatile Handler<Throwable> exceptionHandler;
final TaskQueue internalOrderedTasks;
final WorkerPool internalWorkerPool;
final WorkerPool workerPool;
final TaskQueue orderedTasks;

protected ContextImpl(VertxInternal vertx,
Object[] locals,
ThreadingModel threadingModel,
EventLoop eventLoop,
EventExecutor executor,
Expand All @@ -61,6 +61,7 @@ protected ContextImpl(VertxInternal vertx,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
super(locals);
this.threadingModel = threadingModel;
this.deployment = deployment;
this.config = deployment != null ? deployment.config() : new JsonObject();
Expand Down Expand Up @@ -211,14 +212,6 @@ public synchronized ConcurrentMap<Object, Object> contextData() {
return data;
}

@Override
public synchronized ConcurrentMap<Object, Object> localContextData() {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}

public void reportException(Throwable t) {
Handler<Throwable> handler = exceptionHandler;
if (handler == null) {
Expand Down Expand Up @@ -306,6 +299,6 @@ protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(this);
return new DuplicatedContext(this, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}
}
71 changes: 66 additions & 5 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

import io.netty.channel.EventLoop;
import io.vertx.core.*;
import io.vertx.core.Future;
import io.vertx.core.impl.future.FailedFuture;
import io.vertx.core.impl.future.PromiseImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Supplier;

/**
* This interface provides an api for vert.x core internal use only
Expand All @@ -33,6 +34,8 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);

/**
* @return the current context
*/
Expand Down Expand Up @@ -304,19 +307,76 @@ default boolean remove(Object key) {
/**
* @return the {@link ConcurrentMap} used to store local context data
*/
ConcurrentMap<Object, Object> localContextData();
default ConcurrentMap<Object, Object> localContextData() {
return LOCAL_MAP.get(this, ConcurrentHashMap::new);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
default <T> T getLocal(ContextLocal<T> key) {
return getLocal(key, AccessMode.CONCURRENT);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode);

/**
* Get some local data from the context, when it does not exist the {@code initialValueSupplier} is called to obtain
* the initial value.
*
* <p> The {@code initialValueSupplier} might be called multiple times when multiple threads call this method concurrently.
*
* @param key the key of the data
* @param initialValueSupplier the supplier of the initial value optionally called
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier);

/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
<T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value);

/**
* Remove some local data from the context.
*
* @param key the key to remove
*/
default <T> void removeLocal(ContextLocal<T> key, AccessMode accessMode) {
putLocal(key, accessMode, null);
}

@Deprecated
@SuppressWarnings("unchecked")
@Override
default <T> T getLocal(Object key) {
return (T) localContextData().get(key);
}

@Deprecated
@Override
default void putLocal(Object key, Object value) {
localContextData().put(key, value);
}

@Deprecated
@Override
default boolean removeLocal(Object key) {
return localContextData().remove(key) != null;
Expand Down Expand Up @@ -445,4 +505,5 @@ default ContextInternal unwrap() {
default boolean isDuplicate() {
return false;
}

}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.ContextLocal;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

final int index;

public ContextLocalImpl(int index) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
}
}
20 changes: 5 additions & 15 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class DuplicatedContext implements ContextInternal {
final class DuplicatedContext extends ContextBase implements ContextInternal {

protected final ContextImpl delegate;
private ConcurrentMap<Object, Object> localData;
final ContextImpl delegate;

DuplicatedContext(ContextImpl delegate) {
DuplicatedContext(ContextImpl delegate, Object[] locals) {
super(locals);
this.delegate = delegate;
}

Expand Down Expand Up @@ -119,16 +119,6 @@ public final ConcurrentMap<Object, Object> contextData() {
return delegate.contextData();
}

@Override
public final ConcurrentMap<Object, Object> localContextData() {
synchronized (this) {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}
}

@Override
public <T> Future<T> executeBlockingInternal(Callable<T> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks);
Expand Down Expand Up @@ -176,7 +166,7 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate);
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}

@Override
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
}

static int next() {
return seq.getAndIncrement();
}
}
Loading

0 comments on commit 8de9656

Please sign in to comment.