Skip to content

Commit

Permalink
re-enabled support of legacy distributed push
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Jul 17, 2017
1 parent 76a500d commit 9fad56c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.metadata.security.OToken;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializer;
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializerFactory;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.OTokenSecurityException;
import com.orientechnologies.orient.server.network.protocol.ONetworkProtocol;
Expand Down Expand Up @@ -360,59 +363,59 @@ public int getTotal() {
public void pushDistribCfg2Clients(final ODocument iConfig) {
if (iConfig == null)
return;
//
// final Set<String> pushed = new HashSet<String>();
// for (OClientConnection c : connections.values()) {
// if (!c.getData().supportsPushMessages)
// continue;
//
// try {
// final String remoteAddress = c.getRemoteAddress();
// if (pushed.contains(remoteAddress))
// // ALREADY SENT: JUMP IT
// continue;
//
// } catch (Exception e) {
// // SOCKET EXCEPTION SKIP IT
// continue;
// }
//
// if (!(c.getProtocol() instanceof ONetworkProtocolBinary) || c.getData().getSerializationImpl() == null)
// // INVOLVE ONLY BINARY PROTOCOLS
// continue;
//
// final ONetworkProtocolBinary p = (ONetworkProtocolBinary) c.getProtocol();
// final OChannelBinary channel = p.getChannel();
// final ORecordSerializer ser = ORecordSerializerFactory.instance().getFormat(c.getData().getSerializationImpl());
// if (ser == null)
// return;
//
// final byte[] content = ser.toStream(iConfig, false);
//
// try {
// // TRY ACQUIRING THE LOCK FOR MAXIMUM 3 SECS TO AVOID TO FREEZE CURRENT THREAD
// if (channel.tryAcquireWriteLock(TIMEOUT_PUSH)) {
// try {
// channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
// channel.writeInt(Integer.MIN_VALUE);
// channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_DISTRIB_CONFIG);
// channel.writeBytes(content);
// channel.flush();
//
// pushed.add(c.getRemoteAddress());
// OLogManager.instance().debug(this, "Sent updated cluster configuration to the remote client %s", c.getRemoteAddress());
//
// } finally {
// channel.releaseWriteLock();
// }
// } else {
// OLogManager.instance()
// .info(this, "Timeout on sending updated cluster configuration to the remote client %s", c.getRemoteAddress());
// }
// } catch (Exception e) {
// OLogManager.instance().warn(this, "Cannot push cluster configuration to the client %s", e, c.getRemoteAddress());
// }
// }

final Set<String> pushed = new HashSet<String>();
for (OClientConnection c : connections.values()) {
if (!c.getData().supportsLegacyPushMessages)
continue;

try {
final String remoteAddress = c.getRemoteAddress();
if (pushed.contains(remoteAddress))
// ALREADY SENT: JUMP IT
continue;

} catch (Exception e) {
// SOCKET EXCEPTION SKIP IT
continue;
}

if (!(c.getProtocol() instanceof ONetworkProtocolBinary) || c.getData().getSerializationImpl() == null)
// INVOLVE ONLY BINARY PROTOCOLS
continue;

final ONetworkProtocolBinary p = (ONetworkProtocolBinary) c.getProtocol();
final OChannelBinary channel = p.getChannel();
final ORecordSerializer ser = ORecordSerializerFactory.instance().getFormat(c.getData().getSerializationImpl());
if (ser == null)
return;

final byte[] content = ser.toStream(iConfig, false);

try {
// TRY ACQUIRING THE LOCK FOR MAXIMUM 3 SECS TO AVOID TO FREEZE CURRENT THREAD
if (channel.tryAcquireWriteLock(TIMEOUT_PUSH)) {
try {
channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
channel.writeInt(Integer.MIN_VALUE);
channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_DISTRIB_CONFIG);
channel.writeBytes(content);
channel.flush();

pushed.add(c.getRemoteAddress());
OLogManager.instance().debug(this, "Sent updated cluster configuration to the remote client %s", c.getRemoteAddress());

} finally {
channel.releaseWriteLock();
}
} else {
OLogManager.instance()
.info(this, "Timeout on sending updated cluster configuration to the remote client %s", c.getRemoteAddress());
}
} catch (Exception e) {
OLogManager.instance().warn(this, "Cannot push cluster configuration to the client %s", e, c.getRemoteAddress());
}
}
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ public OBinaryResponse executeConnect(OConnectRequest request) {
connection.getData().setSerializationImpl(request.getRecordFormat());

connection.setTokenBased(request.isTokenBased());
connection.getData().supportsPushMessages = request.isSupportPush();
connection.getData().supportsLegacyPushMessages = request.isSupportPush();
connection.getData().collectStats = request.isCollectStats();

connection.setServerUser(server.serverLogin(request.getUsername(), request.getPassword(), "server.connect"));
Expand Down Expand Up @@ -923,7 +923,7 @@ public OBinaryResponse executeConnect37(OConnect37Request request) {
connection.getData().setSerializer(handshakeInfo.getSerializer());

connection.setTokenBased(true);
connection.getData().supportsPushMessages = true;
connection.getData().supportsLegacyPushMessages = false;
connection.getData().collectStats = true;

connection.setServerUser(server.serverLogin(request.getUsername(), request.getPassword(), "server.connect"));
Expand Down Expand Up @@ -953,7 +953,7 @@ public OBinaryResponse executeDatabaseOpen(OOpenRequest request) {
connection.getData().clientId = request.getClientId();
connection.getData().setSerializationImpl(request.getRecordFormat());
connection.setTokenBased(request.isUseToken());
connection.getData().supportsPushMessages = request.isSupportsPush();
connection.getData().supportsLegacyPushMessages = request.isSupportsPush();
connection.getData().collectStats = request.isCollectStats();

try {
Expand Down Expand Up @@ -1006,7 +1006,7 @@ public OBinaryResponse executeDatabaseOpen(OOpenRequest request) {
@Override
public OBinaryResponse executeDatabaseOpen37(OOpen37Request request) {
connection.setTokenBased(true);
connection.getData().supportsPushMessages = true;
connection.getData().supportsLegacyPushMessages = false;
connection.getData().collectStats = true;
connection.getData().driverName = handshakeInfo.getDriverName();
connection.getData().driverVersion = handshakeInfo.getDriverVersion();
Expand Down Expand Up @@ -1329,7 +1329,7 @@ public OBinaryResponse executeDistributedConnect(ODistributedConnectRequest requ
connection.getData().clientId = "OrientDB Distributed";
connection.getData().setSerializer(ORecordSerializerNetworkV37.INSTANCE);
connection.setTokenBased(true);
connection.getData().supportsPushMessages = false;
connection.getData().supportsLegacyPushMessages = false;
connection.getData().collectStats = false;
int chosenProtocolVersion = Math.min(request.getDistributedProtocolVersion(), ORemoteServerController.CURRENT_PROTOCOL_VERSION);
connection.setServerUser(serverUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ public class ONetworkProtocolData {
public String serverInfo = null;
public String caller = null;
public String driverName = null;
public String driverVersion = null;
public short protocolVersion = -1;
public int sessionId = -1;
public String clientId = null;
public String currentUserId = null;
private String serializationImpl = null;
public boolean serverUser = false;
public String serverUsername = null;
public OCommandRequestText command = null;
public boolean supportsPushMessages = true;
public boolean collectStats = true;
public String driverVersion = null;
public short protocolVersion = -1;
public int sessionId = -1;
public String clientId = null;
public String currentUserId = null;
private String serializationImpl = null;
public boolean serverUser = false;
public String serverUsername = null;
public OCommandRequestText command = null;
public boolean supportsLegacyPushMessages = true;
public boolean collectStats = true;
private ORecordSerializer serializer;

public String getSerializationImpl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public ONetworkProtocolData getProtocolDataFromToken(OClientConnection connectio
data.serverUser = binary.isServerUser();
data.serverUsername = binary.getUserName();
data.serverUsername = binary.getUserName();
data.supportsPushMessages = connection.getData().supportsPushMessages;
data.supportsLegacyPushMessages = connection.getData().supportsLegacyPushMessages;
data.collectStats = connection.getData().collectStats;
return data;
}
Expand Down

0 comments on commit 9fad56c

Please sign in to comment.