Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

THRIFT-2427 Add support for Multiplexed Async Processors #747

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ public TBaseAsyncProcessor(I iface, Map<String, AsyncProcessFunction<I, ? extend
}

public boolean process(final AsyncFrameBuffer fb) throws TException {
return process(fb, fb.getInputProtocol(), fb.getOutputProtocol());
}

final TProtocol in = fb.getInputProtocol();
final TProtocol out = fb.getOutputProtocol();

/** Allow to decorate input and output protocols */
boolean process(AsyncFrameBuffer fb, TProtocol in, TProtocol out) throws TException {
//Find processing function
final TMessage msg = in.readMessageBegin();
AsyncProcessFunction fn = processMap.get(msg.name);
Expand Down Expand Up @@ -85,7 +86,6 @@ public boolean process(final AsyncFrameBuffer fb) throws TException {
return true;
}

@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
return false;
}
Expand Down
128 changes: 128 additions & 0 deletions lib/java/src/org/apache/thrift/TMultiplexedAsyncProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@

package org.apache.thrift;

import org.apache.thrift.TMultiplexedProcessor.StoredMessageProtocol;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TMessageType;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer;

import java.util.HashMap;
import java.util.Map;

/**
* <code>TMultiplexedAsyncProcessor</code> is an <code>TAsyncProcessor</code> allowing
* a single <code>TServer</code> to provide multiple services.
*
* <p>To do so, you instantiate the processor and then register additional
* processors with it, as shown in the following example:</p>
*
* <blockquote><code>
* final TMultiplexedAsyncProcessor processor = new TMultiplexedAsyncProcessor();
*
* processor.registerProcessor(
* "Calculator",
* new Calculator.AsyncProcessor(new CalculatorHandler()));
*
* processor.registerProcessor(
* "WeatherReport",
* new WeatherReport.AsyncProcessor(new WeatherReportHandler()));
*
* TServerTransport t = new TServerSocket(9090);
* TSimpleServer server = new TSimpleServer(processor, t);
*
* server.serve();
* </code></blockquote>
*/
public final class TMultiplexedAsyncProcessor implements TAsyncProcessor, TProcessor {
private final Map<String,TBaseAsyncProcessor> SERVICE_PROCESSOR_MAP = new HashMap<String, TBaseAsyncProcessor>();

/**
* 'Register' a service with this <code>TMultiplexedAsyncProcessor</code>. This
* allows us to broker requests to individual services by using the service
* name to select them at request time.
*
* @param serviceName Name of a service, has to be identical to the name
* declared in the Thrift IDL, e.g. "WeatherReport".
* @param processor Implementation of a service, usually referred to
* as "handlers", e.g. WeatherReportHandler implementing WeatherReport.AsyncIface.
*/
public void registerProcessor(String serviceName, TBaseAsyncProcessor processor) {
SERVICE_PROCESSOR_MAP.put(serviceName, processor);
}

/**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
* <li>Read the beginning of the message.</li>
* <li>Extract the service name from the message.</li>
* <li>Using the service name to locate the appropriate processor.</li>
* <li>Construct message without service name prefix.</li>
* <li>Locate process function.</li>
* <li>Read arguments.</li>
* <li>Start processing function.</li>
* <li>Set response ready for oneway calls.</li>
* </ol>
*
* @param fb comes from layer below
* @throws TException If the message type is not CALL or ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map. You called {@link #registerProcessor(String, TBaseAsyncProcessor) registerProcessor}
* during initialization, right? :)
*/
public boolean process(AsyncFrameBuffer fb) throws TException {
final TMessage messageBegin = fb.getInputProtocol().readMessageBegin();

final String serviceName = extractServiceName(messageBegin);

final StoredMessageProtocol iprot = decorateProtocol(fb.getInputProtocol(), messageBegin, serviceName);

return findActualProcessor(serviceName).process(fb, iprot, fb.getOutputProtocol());
}

/** expect {@link org.apache.thrift.protocol.TMultiplexedProtocol}-formatted message name */
private static String extractServiceName(TMessage messageBegin) throws TException {
if (messageBegin.type != TMessageType.CALL && messageBegin.type != TMessageType.ONEWAY) {
// TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
// TODO Should we check for this here?
throw new TException("Bad message type, this should not have happened!?");
}

// Extract the service name
final int index = messageBegin.name.indexOf(TMultiplexedProtocol.SEPARATOR);
if (index < 0) {
throw new TException("Service name not found in message name: " + messageBegin.name + ". Did you " +
"forget to use a TMultiplexProtocol in your client?");
}

return messageBegin.name.substring(0, index);
}

/** Create a new "message begin" TMessage, removing the service name */
private static StoredMessageProtocol decorateProtocol(TProtocol iprot, TMessage message, String serviceName) {
final TMessage standardMessage = new TMessage(
message.name.substring(serviceName.length() + TMultiplexedProtocol.SEPARATOR.length()),
message.type,
message.seqid
);

return new StoredMessageProtocol(iprot, standardMessage);
}

private TBaseAsyncProcessor findActualProcessor(String serviceName) throws TException {
final TBaseAsyncProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);

if (actualProcessor == null) {
throw new TException("Service name not found: " + serviceName + ". Did you forget " +
"to call registerProcessor()?");
}

return actualProcessor;
}

public boolean process(TProtocol in, TProtocol out) {
return false;
}
}
4 changes: 2 additions & 2 deletions lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void registerProcessor(String serviceName, TProcessor processor) {
* <li>Dispatch to the processor, with a decorated instance of TProtocol
* that allows readMessageBegin() to return the original TMessage.</li>
* </ol>
*
*
* @throws TException If the message type is not CALL or ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
Expand Down Expand Up @@ -128,7 +128,7 @@ Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
* to allow them to call readMessageBegin() and get a TMessage in exactly
* the standard format, without the service name prepended to TMessage.name.
*/
private static class StoredMessageProtocol extends TProtocolDecorator {
static final class StoredMessageProtocol extends TProtocolDecorator {
TMessage messageBegin;
public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) {
super(protocol);
Expand Down