Skip to content

Commit

Permalink
Add druid table insert/ingestion skeleton code
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan authored and zhenxiao committed Aug 14, 2020
1 parent 27e922f commit d5928f1
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.druid.ingestion.DruidPageSinkProvider;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
Expand All @@ -42,6 +44,7 @@ public class DruidConnector
private final DruidMetadata metadata;
private final DruidSplitManager splitManager;
private final DruidPageSourceProvider pageSourceProvider;
private final DruidPageSinkProvider pageSinkProvider;
private final List<PropertyMetadata<?>> sessionProperties;
private final ConnectorPlanOptimizer planOptimizer;

Expand All @@ -51,13 +54,15 @@ public DruidConnector(
DruidMetadata metadata,
DruidSplitManager splitManager,
DruidPageSourceProvider pageSourceProvider,
DruidPageSinkProvider pageSinkProvider,
DruidSessionProperties druidSessionProperties,
DruidPlanOptimizer planOptimizer)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvide is null");
this.sessionProperties = ImmutableList.copyOf(requireNonNull(druidSessionProperties, "sessionProperties is null").getSessionProperties());
this.planOptimizer = requireNonNull(planOptimizer, "plan optimizer is null");
}
Expand Down Expand Up @@ -86,6 +91,12 @@ public ConnectorPageSourceProvider getPageSourceProvider()
return pageSourceProvider;
}

@Override
public ConnectorPageSinkProvider getPageSinkProvider()
{
return pageSinkProvider;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package com.facebook.presto.druid;

import com.facebook.presto.druid.ingestion.DruidIngestionTableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
Expand Down Expand Up @@ -52,4 +54,10 @@ public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
return DruidTransactionHandle.class;
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
return DruidIngestionTableHandle.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package com.facebook.presto.druid;

import com.facebook.presto.common.type.Type;
import com.facebook.presto.druid.ingestion.DruidIngestionTableHandle;
import com.facebook.presto.druid.metadata.DruidColumnInfo;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
Expand All @@ -27,11 +29,15 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;

import javax.inject.Inject;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -135,6 +141,19 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
return ((DruidColumnHandle) columnHandle).getColumnMetadata();
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DruidTableHandle druidTableHandle = (DruidTableHandle) tableHandle;
return new DruidIngestionTableHandle(druidTableHandle.getSchemaName(), druidTableHandle.getTableName());
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}

private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
{
if (prefix.getTableName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.druid;

import com.facebook.presto.druid.ingestion.DruidPageSinkProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
Expand All @@ -37,6 +38,7 @@ public void configure(Binder binder)
binder.bind(DruidPlanOptimizer.class).in(Scopes.SINGLETON);
binder.bind(DruidSplitManager.class).in(Scopes.SINGLETON);
binder.bind(DruidPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(DruidPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(DruidQueryGenerator.class).in(Scopes.SINGLETON);
binder.bind(DruidSessionProperties.class).in(Scopes.SINGLETON);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.druid.ingestion;

import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class DruidIngestionTableHandle
implements ConnectorInsertTableHandle
{
private final String schemaName;
private final String tableName;

@JsonCreator
public DruidIngestionTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
}

@JsonProperty
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
}

@Override
public int hashCode()
{
return Objects.hash(schemaName, tableName);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}

DruidIngestionTableHandle other = (DruidIngestionTableHandle) obj;
return Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("schemaName", schemaName)
.add("tableName", tableName)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.druid.ingestion;

import com.facebook.presto.common.Page;
import com.facebook.presto.druid.DruidClient;
import com.facebook.presto.spi.ConnectorPageSink;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;

public class DruidPageSink
implements ConnectorPageSink
{
private final DruidClient druidClient;
private final DruidIngestionTableHandle tableHandle;

public DruidPageSink(DruidClient druidClient, DruidIngestionTableHandle tableHandle)
{
this.druidClient = druidClient;
this.tableHandle = tableHandle;
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
return NOT_BLOCKED;
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
return completedFuture(ImmutableList.of());
}

@Override
public void abort()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.druid.ingestion;

import com.facebook.presto.druid.DruidClient;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageSinkProperties;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;

import javax.inject.Inject;

import static java.util.Objects.requireNonNull;

public class DruidPageSinkProvider
implements ConnectorPageSinkProvider
{
private final DruidClient druidClient;

@Inject
public DruidPageSinkProvider(DruidClient druidClient)
{
this.druidClient = requireNonNull(druidClient, "druid client is null");
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, PageSinkProperties pageSinkProperties)
{
throw new UnsupportedOperationException("Table creation is not supported by the druid connector");
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, PageSinkProperties pageSinkProperties)
{
DruidIngestionTableHandle tableHandle = (DruidIngestionTableHandle) insertTableHandle;
return new DruidPageSink(druidClient, tableHandle);
}
}

0 comments on commit d5928f1

Please sign in to comment.