Skip to content

Commit

Permalink
clean up result closing
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasHafner committed Mar 25, 2024
1 parent ab21ccc commit 1e052a4
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 69 deletions.
29 changes: 15 additions & 14 deletions src/main/java/org/polypheny/jdbc/PolyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,23 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import org.polypheny.jdbc.meta.PolyphenyDatabaseMetadata;
import org.polypheny.jdbc.multimodel.PolyStatement;
import org.polypheny.jdbc.properties.PolyphenyConnectionProperties;
import org.polypheny.jdbc.properties.PolyphenyStatementProperties;
import org.polypheny.jdbc.properties.PropertyUtils;
import org.polypheny.db.protointerface.proto.PreparedStatementSignature;
import org.polypheny.jdbc.jdbctypes.PolyphenyArray;
import org.polypheny.jdbc.jdbctypes.PolyphenyBlob;
import org.polypheny.jdbc.jdbctypes.PolyphenyClob;
import org.polypheny.jdbc.jdbctypes.PolyphenyStruct;
import org.polypheny.jdbc.meta.PolyphenyDatabaseMetadata;
import org.polypheny.jdbc.multimodel.PolyStatement;
import org.polypheny.jdbc.properties.PolyphenyConnectionProperties;
import org.polypheny.jdbc.properties.PolyphenyStatementProperties;
import org.polypheny.jdbc.properties.PropertyUtils;

public class PolyConnection implements Connection {

Expand Down Expand Up @@ -115,7 +113,12 @@ public void run() {
}


public void removeStatementFromOpen( Statement statement ) {
public void startTracking( Statement statement ) {
openStatements.add( statement );
}


public void endTracking( Statement statement ) {
if ( !openStatements.contains( statement ) ) {
return;
}
Expand All @@ -137,7 +140,7 @@ public ProtoInterfaceClient getProtoInterfaceClient() {
public Statement createStatement() throws SQLException {
throwIfClosed();
PolyphenyStatement statement = new PolyphenyStatement( this, properties.toStatementProperties() );
openStatements.add( statement );
startTracking( statement );
return statement;
}

Expand All @@ -156,7 +159,7 @@ public PreparedStatement prepareStatement( String sql ) throws SQLException {
getTimeout()
);
PolyphenyPreparedStatement statement = new PolyphenyPreparedStatement( this, properties.toStatementProperties(), signature );
openStatements.add( statement );
startTracking( statement );
return statement;
}

Expand Down Expand Up @@ -214,10 +217,8 @@ public void close() throws SQLException {
if ( isClosed() ) {
return;
}
List<Statement> statements = new ArrayList<>( openStatements );
for ( Statement statement : statements ) {
statement.close();
openStatements.remove( statement );
for ( Statement openStatement : new HashSet<>( openStatements ) ) {
openStatement.close();
}
getProtoInterfaceClient().unregister( properties.getNetworkTimeout() );
isClosed = true;
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/org/polypheny/jdbc/PolyhenyResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ public void close() throws SQLException {
if ( isClosed ) {
return;
}
getStatement().setFetchSize( properties.getStatementFetchSize() );
if ( getStatement().isCloseOnCompletion() ) {
getStatement().unwrap( PolyphenyStatement.class ).closeStatementOnly();
}
statement.notifyResultClosure();
isClosed = true;
}

Expand Down
14 changes: 10 additions & 4 deletions src/main/java/org/polypheny/jdbc/PolyphenyPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public PolyphenyPreparedStatement( PolyConnection connection, PolyphenyStatement
this.parameterBatch = new LinkedList<>();
}

private void prepareForReexecution() throws SQLException {
if (currentResult != null) {
currentResult.close();
}
currentUpdateCount = NO_UPDATE_COUNT;
}


private TypedValue[] createParameterList( int parameterCount ) {
return new TypedValue[parameterCount];
Expand Down Expand Up @@ -131,14 +138,13 @@ public boolean execute( String s, String[] strings ) throws SQLException {
@Override
public ResultSet executeQuery() throws SQLException {
throwIfClosed();

prepareForReexecution();
StatementResult result = getClient().executeIndexedStatement(
statementId,
Arrays.asList( parameters ),
properties.getFetchSize(),
getTimeout()
);
closeCurrentResult();
if ( !result.hasFrame() ) {
throw new ProtoInterfaceServiceException( ProtoInterfaceErrors.RESULT_TYPE_INVALID, "Statement must produce a single ResultSet" );
}
Expand All @@ -152,13 +158,13 @@ public ResultSet executeQuery() throws SQLException {
@Override
public long executeLargeUpdate() throws SQLException {
throwIfClosed();
prepareForReexecution();
StatementResult result = getClient().executeIndexedStatement(
statementId,
Arrays.asList( parameters ),
properties.getFetchSize(),
getTimeout()
);
closeCurrentResult();
if ( result.hasFrame() ) {
throw new ProtoInterfaceServiceException( ProtoInterfaceErrors.RESULT_TYPE_INVALID, "Statement must not produce a ResultSet" );
}
Expand Down Expand Up @@ -362,13 +368,13 @@ public void setObject( int parameterIndex, Object x ) throws SQLException {
@Override
public boolean execute() throws SQLException {
throwIfClosed();
prepareForReexecution();
StatementResult result = getClient().executeIndexedStatement(
statementId,
Arrays.asList( parameters ),
properties.getFetchSize(),
getTimeout()
);
closeCurrentResult();
if ( !result.hasFrame() ) {
currentUpdateCount = result.getScalar();
return false;
Expand Down
100 changes: 57 additions & 43 deletions src/main/java/org/polypheny/jdbc/PolyphenyStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
import org.polypheny.db.protointerface.proto.Response;
import org.polypheny.jdbc.properties.PolyphenyStatementProperties;
import org.polypheny.jdbc.properties.PropertyUtils;
import org.polypheny.db.protointerface.proto.ExecuteUnparameterizedStatementRequest;
import org.polypheny.db.protointerface.proto.Frame;
import org.polypheny.db.protointerface.proto.Frame.ResultCase;
import org.polypheny.db.protointerface.proto.Response;
import org.polypheny.db.protointerface.proto.StatementBatchResponse;
import org.polypheny.db.protointerface.proto.StatementResponse;
import org.polypheny.jdbc.properties.PolyphenyStatementProperties;
import org.polypheny.jdbc.properties.PropertyUtils;
import org.polypheny.jdbc.utils.CallbackQueue;

public class PolyphenyStatement implements Statement {
Expand Down Expand Up @@ -66,23 +66,62 @@ protected int longToInt( long longNumber ) {
return Math.toIntExact( longNumber );
}


protected void closeCurrentResult() throws SQLException {
if ( currentResult != null ) {
/**
* protected void closeCurrentResult() throws SQLException {
* if ( currentResult != null ) {
* currentResult.close();
* }
* currentResult = null;
* currentUpdateCount = NO_UPDATE_COUNT;
* }
**/

/**
* void discardStatement() throws SQLException {
* if (statementId == NO_STATEMENT_ID) {
* return;
* }
*
* statementId = NO_STATEMENT_ID;
* }
**/

/**
* public void closeStatementOnly() throws SQLException {
* this.statementId = NO_STATEMENT_ID;
* this.currentResult = null;
* close();
* }
**/
private void prepareForReexecution() throws SQLException {
if (currentResult != null) {
currentResult.close();
}
currentResult = null;
currentUpdateCount = NO_UPDATE_COUNT;
if (statementId != NO_STATEMENT_ID) {
getClient().closeStatement( statementId, getTimeout() );
statementId = NO_STATEMENT_ID;
}
}

public void notifyResultClosure() throws SQLException {
this.currentResult = null;
getClient().closeResult( statementId, getTimeout() );
}

protected int getTimeout() throws SQLException {
return Math.min( getConnection().getNetworkTimeout(), properties.getQueryTimeoutSeconds() * 1000 );
@Override
public void close() throws SQLException {
if ( isClosed ) {
return;
}
polyConnection.endTracking( this );
prepareForReexecution();
isClosed = true;
}


void discardStatementId() {
statementId = NO_STATEMENT_ID;
protected int getTimeout() throws SQLException {
return Math.min( getConnection().getNetworkTimeout(), properties.getQueryTimeoutSeconds() * 1000 );
}


Expand All @@ -104,8 +143,7 @@ protected void throwIfNotRelational( Frame frame ) throws SQLException {
@Override
public ResultSet executeQuery( String statement ) throws SQLException {
throwIfClosed();
closeCurrentResult();
discardStatementId();
prepareForReexecution();
CallbackQueue<StatementResponse> callback = new CallbackQueue<>( Response::getStatementResponse );
String namespaceName = getConnection().getSchema();
getClient().executeUnparameterizedStatement( namespaceName, PropertyUtils.getSQL_LANGUAGE_NAME(), statement, callback, getTimeout() );
Expand Down Expand Up @@ -136,8 +174,7 @@ public ResultSet executeQuery( String statement ) throws SQLException {
@Override
public int executeUpdate( String statement ) throws SQLException {
throwIfClosed();
closeCurrentResult();
discardStatementId();
prepareForReexecution();
CallbackQueue<StatementResponse> callback = new CallbackQueue<>( Response::getStatementResponse );
String namespaceName = getConnection().getSchema();
getClient().executeUnparameterizedStatement( namespaceName, PropertyUtils.getSQL_LANGUAGE_NAME(), statement, callback, getTimeout() );
Expand All @@ -163,27 +200,6 @@ public int executeUpdate( String statement ) throws SQLException {
}


public void closeStatementOnly() throws SQLException {
this.statementId = NO_STATEMENT_ID;
this.currentResult = null;
close();
}


@Override
public void close() throws SQLException {
if ( isClosed ) {
return;
}
if ( currentResult != null ) {
currentResult.close();
}
polyConnection.removeStatementFromOpen( this );
getClient().closeStatement( statementId, getTimeout() );
isClosed = true;
}


@Override
public int getMaxFieldSize() throws SQLException {
throwIfClosed();
Expand Down Expand Up @@ -252,7 +268,7 @@ public void setQueryTimeout( int seconds ) throws SQLException {
@Override
public void cancel() throws SQLException {
throwIfClosed();
// TODO TH: implment cancelling
// TODO TH: implement cancelling
}


Expand All @@ -278,8 +294,7 @@ public void setCursorName( String s ) throws SQLException {
@Override
public boolean execute( String statement ) throws SQLException {
throwIfClosed();
closeCurrentResult();
discardStatementId();
prepareForReexecution();
CallbackQueue<StatementResponse> callback = new CallbackQueue<>( Response::getStatementResponse );
String namespaceName = getConnection().getSchema();
getClient().executeUnparameterizedStatement( namespaceName, PropertyUtils.getSQL_LANGUAGE_NAME(), statement, callback, getTimeout() );
Expand Down Expand Up @@ -331,7 +346,7 @@ public int getUpdateCount() throws SQLException {
@Override
public boolean getMoreResults() throws SQLException {
throwIfClosed();
closeCurrentResult();
prepareForReexecution();
// statements can not return multiple result sets
return false;
}
Expand Down Expand Up @@ -416,8 +431,7 @@ public int[] executeBatch() throws SQLException {

private List<Long> executeUnparameterizedBatch() throws SQLException {
throwIfClosed();
closeCurrentResult();
discardStatementId();
prepareForReexecution();
CallbackQueue<StatementBatchResponse> callback = new CallbackQueue<>( Response::getStatementBatchResponse );
List<ExecuteUnparameterizedStatementRequest> requests = buildBatchRequest();
clearBatch();
Expand Down Expand Up @@ -476,7 +490,7 @@ public boolean getMoreResults( int i ) throws SQLException {
throw new ProtoInterfaceServiceException( ProtoInterfaceErrors.VALUE_ILLEGAL, "Illegal value for closing behaviour: " + i );
}
throwIfClosed();
closeCurrentResult();
prepareForReexecution();
// statements can not return multiple result sets
return false;
}
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/org/polypheny/jdbc/ProtoInterfaceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.polypheny.jdbc.properties.PolyphenyConnectionProperties;
import org.polypheny.db.protointerface.proto.ClientInfoProperties;
import org.polypheny.db.protointerface.proto.ClientInfoPropertiesRequest;
import org.polypheny.db.protointerface.proto.ClientInfoPropertyMeta;
import org.polypheny.db.protointerface.proto.ClientInfoPropertyMetaRequest;
import org.polypheny.db.protointerface.proto.CloseResultRequest;
import org.polypheny.db.protointerface.proto.CloseStatementRequest;
import org.polypheny.db.protointerface.proto.CommitRequest;
import org.polypheny.db.protointerface.proto.ConnectionCheckRequest;
Expand Down Expand Up @@ -57,8 +57,9 @@
import org.polypheny.db.protointerface.proto.TypesRequest;
import org.polypheny.db.protointerface.proto.UserDefinedType;
import org.polypheny.db.protointerface.proto.UserDefinedTypesRequest;
import org.polypheny.jdbc.serialisation.ProtoValueSerializer;
import org.polypheny.jdbc.jdbctypes.TypedValue;
import org.polypheny.jdbc.properties.PolyphenyConnectionProperties;
import org.polypheny.jdbc.serialisation.ProtoValueSerializer;
import org.polypheny.jdbc.transport.PlainTransport;
import org.polypheny.jdbc.transport.Transport;
import org.polypheny.jdbc.utils.CallbackQueue;
Expand Down Expand Up @@ -229,6 +230,15 @@ public void closeStatement( int statementId, int timeout ) throws ProtoInterface
}


public void closeResult( int statementId, int timeout ) throws ProtoInterfaceServiceException {
CloseResultRequest resultCloseRequest = CloseResultRequest.newBuilder()
.setStatementId( statementId )
.build();

rpc.closeResult( resultCloseRequest, timeout );
}


public Frame fetchResult( int statementId, int fetchSize, int timeout ) throws ProtoInterfaceServiceException {
FetchRequest fetchRequest = FetchRequest.newBuilder()
.setFetchSize( fetchSize )
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/polypheny/jdbc/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.polypheny.db.protointerface.proto.ClientInfoPropertiesResponse;
import org.polypheny.db.protointerface.proto.ClientInfoPropertyMetaRequest;
import org.polypheny.db.protointerface.proto.ClientInfoPropertyMetaResponse;
import org.polypheny.db.protointerface.proto.CloseResultRequest;
import org.polypheny.db.protointerface.proto.CloseResultResponse;
import org.polypheny.db.protointerface.proto.CloseStatementRequest;
import org.polypheny.db.protointerface.proto.CloseStatementResponse;
import org.polypheny.db.protointerface.proto.CommitRequest;
Expand Down Expand Up @@ -437,4 +439,11 @@ CloseStatementResponse closeStatement( CloseStatementRequest msg, int timeout )
return completeSynchronously( req, timeout ).getCloseStatementResponse();
}


CloseResultResponse closeResult( CloseResultRequest msg, int timeout ) throws ProtoInterfaceServiceException {
Request.Builder req = newMessage();
req.setCloseResultRequest( msg );
return completeSynchronously( req, timeout ).getCloseResultResponse();
}

}
Loading

0 comments on commit 1e052a4

Please sign in to comment.