Skip to content

Commit

Permalink
imply-5135 Complete tests (#88)
Browse files Browse the repository at this point in the history
* imply-5135 Complete tests

* Extended result of getTables based on team feedback

* Refactored getRunning maps according to PR comments

* Made tables API more consistent

* Fix broken unit tests

* Added Derby test

* PR feedback
  • Loading branch information
Agustin Gonzalez authored Dec 10, 2020
1 parent d283d90 commit 303c1e6
Show file tree
Hide file tree
Showing 7 changed files with 751 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;

/**
* Persistence layer for ingest service, for storage of ingest jobs, ingest schemas
Expand All @@ -25,22 +26,40 @@ public interface IngestServiceMetadataStore

// tables
int insertTable(String name);

boolean druidTableExists(String name);
List<String> getAllTableNames();

List<Table> getTables();

// jobs
String stageJob(String tableName, JobRunner jobType);

int scheduleJob(String jobId, IngestSchema schema);

void setJobStatus(String jobId, JobStatus jobStatus);

void setJobState(String jobId, JobState jobState);

void setJobStateAndStatus(String jobId, @Nullable JobStatus status, @Nullable JobState jobState);

void setJobCancelled(String jobId, JobStatus status);

int jobRetry(String jobId);

List<IngestJob> getJobs(@Nullable JobState jobState);

@Nullable
IngestJob getJob(String jobId);

/**
* @param jobStatesToFilterOn If null then return all tables with associated job states even when the table does not
* have jobs (in this case job states will be an empty list).
* If the list is empty return tables with no jobs.
* If the list is non-empty return tables only having states specified.
* @return A list of tables with their associated job state count
*/
Set<TableJobStateStats> getJobCountPerTablePerState(@Nullable Set<JobState> jobStatesToFilterOn);

// schemas
int createSchema(IngestSchema schema);
IngestSchema getSchema(int schemaId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) Imply Data, Inc. All rights reserved.
*
* This software is the confidential and proprietary information
* of Imply Data, Inc. You shall not disclose such Confidential
* Information and shall use it only in accordance with the terms
* of the license agreement you entered into with Imply.
*/

package io.imply.druid.ingest.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.security.Action;
import org.joda.time.DateTime;

import java.util.HashSet;
import java.util.Set;

public class Table
{
private String name;
private DateTime createdTime;
private Set<Action> permissions = new HashSet<>();

public Table(Table table)
{
this.name = table.getName();
this.createdTime = table.getCreatedTime();
this.permissions = new HashSet<>(table.getPermissions());
}

@JsonCreator
public Table(
@JsonProperty("name") String name,
@JsonProperty("createdTime") DateTime createdTime
)
{
this.name = name;
this.createdTime = createdTime;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}


@JsonProperty
public Set<Action> getPermissions()
{
return permissions;
}

public Table addPermissions(Action permission)
{
this.permissions.add(permission);
return this;
}

@Override
public boolean equals(Object o)
{
if (o == this) {
return true;
}
if (!(o instanceof Table)) {
return false;
}
Table other = (Table) o;
return this.name.equals(other.getName())
&& (this.getCreatedTime().equals(other.getCreatedTime()))
&& (this.getPermissions().equals(other.getPermissions()));
}

@Override
public int hashCode()
{
return 31 * this.getName().hashCode() +
this.getCreatedTime().hashCode() +
this.getPermissions().hashCode() +
super.hashCode();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) Imply Data, Inc. All rights reserved.
*
* This software is the confidential and proprietary information
* of Imply Data, Inc. You shall not disclose such Confidential
* Information and shall use it only in accordance with the terms
* of the license agreement you entered into with Imply.
*/

package io.imply.druid.ingest.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.imply.druid.ingest.jobs.JobState;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

public class TableJobStateStats extends Table
{
private List<JobStateCount> jobStateCounts = new ArrayList<>();

public TableJobStateStats(Table table)
{
super(table);
}

@JsonCreator
public TableJobStateStats(
@JsonProperty("name") String name,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("jobState") JobState jobState,
@JsonProperty("count") int count
)
{
super(name, createdTime);
this.addJobState(jobState, count);
}

public TableJobStateStats addJobState(@Nullable JobState js, int count)
{
if (js != null) {
jobStateCounts.add(new JobStateCount(js, count));
}
return this;
}

@JsonProperty
public List<JobStateCount> getJobStateCounts()
{
return jobStateCounts;
}

@Override
public boolean equals(Object o)
{
// this extends Table which has implemented equals...
if (!super.equals(o)) {
return false;
}
TableJobStateStats other = (TableJobStateStats) o;
return (this.jobStateCounts.equals(other.getJobStateCounts()));
}

@Override
public int hashCode()
{
return 31 * this.getPermissions().hashCode() +
this.jobStateCounts.hashCode() +
super.hashCode();
}

public static class JobStateCount
{
private JobState jobState;
private int count;

@JsonCreator
public JobStateCount(
@JsonProperty JobState js,
@JsonProperty int count
)
{
this.jobState = js;
this.count = count;
}

@JsonProperty
public JobState getJobState()
{
return jobState;
}

@JsonProperty
public int getCount()
{
return count;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import io.imply.druid.ingest.metadata.IngestJob;
import io.imply.druid.ingest.metadata.IngestSchema;
import io.imply.druid.ingest.metadata.IngestServiceMetadataStore;
import io.imply.druid.ingest.metadata.Table;
import io.imply.druid.ingest.metadata.TableJobStateStats;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.TransactionCallback;
Expand All @@ -36,8 +39,12 @@
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* fill me out
Expand Down Expand Up @@ -246,7 +253,8 @@ public int jobRetry(String jobId)
}

@Override
public @Nullable IngestJob getJob(String jobId)
public @Nullable
IngestJob getJob(String jobId)
{
return metadataConnector.getDBI().inTransaction(
(handle, status) ->
Expand Down Expand Up @@ -282,6 +290,66 @@ public List<IngestJob> getJobs(@Nullable JobState jobState)
});
}

@Override
public Set<TableJobStateStats> getJobCountPerTablePerState(@Nullable Set<JobState> statesToFilterOn)
{
final String baseQueryStr = "SELECT t.name as tn, count(j.job_state) as cnt, j.job_state as js, "
+ "t.created_timestamp as tcs FROM ingest_tables t "
+ "LEFT JOIN ingest_jobs j ON j.table_name = t.name "
+ "GROUP BY t.name, t.created_timestamp, j.job_state "
+ "%s ";

List<TableJobStateStats> tableJobStateStats =
metadataConnector.getDBI().inTransaction(
(handle, status) -> {
String queryStr;
if (statesToFilterOn == null) {
// all tables
queryStr = String.format(baseQueryStr, "ORDER BY t.name");
} else if (statesToFilterOn.size() == 0) {
// only tables with no jobs
queryStr = String.format(baseQueryStr, "HAVING cnt = 0 ORDER BY t.name");
} else {
// Add states constraint HAVING clause....
// build IN list:
StringBuilder inList = new StringBuilder("(");
boolean first = true;
for (JobState st : statesToFilterOn) {
if (!first) {
inList.append(",");
} else {
first = false;
}
inList.append("'").append(st.name()).append("'");
}
inList.append(")");

String tail = String.format("HAVING j.job_state IN %s ORDER by t.name", inList);
queryStr = String.format(baseQueryStr, tail);
}
final Query<Map<String, Object>> query = handle.createQuery(queryStr);
// get tables with their corresponding states
Map<String, TableJobStateStats> tableJobsMap = new HashMap<>();
return query.map(
(index, r, ctx) -> {
// get sql results:
String tn = r.getString("tn");
int count = r.getInt("cnt");
JobState js = Optional.ofNullable(r.getString("js")).map(jss -> JobState.fromString(jss))
.orElse(null);
DateTime tct = DateTimes.of(r.getString("tcs"));
// update table map cache:
if (tableJobsMap.computeIfPresent(tn, (k, v) -> v.addJobState(js, count)) == null) {
tableJobsMap.put(tn, new TableJobStateStats(tn, tct, js, count));
}
return tableJobsMap.get(tn);
}).list();
}
);

return new HashSet<>(tableJobStateStats);
}

@Nonnull
private IngestJob resultRowAsIngestJob(ResultSet r) throws SQLException
{
Expand Down Expand Up @@ -430,9 +498,12 @@ public int deleteSchema(int schemaId)
@Override
public Integer inTransaction(Handle handle, TransactionStatus transactionStatus)
{
return handle.createStatement(StringUtils.format("DELETE from %1$s WHERE id = :id", ingestConfig.get().getSchemasTable()))
.bind("id", schemaId)
.execute();
return handle.createStatement(StringUtils.format(
"DELETE from %1$s WHERE id = :id",
ingestConfig.get().getSchemasTable()
))
.bind("id", schemaId)
.execute();
}
}
);
Expand Down Expand Up @@ -499,12 +570,23 @@ public boolean druidTableExists(String name)
}

@Override
public List<String> getAllTableNames()
public List<Table> getTables()
{
final String query = StringUtils.format("SELECT name FROM %s", ingestConfig.get().getTablesTable());
return metadataConnector.getDBI().withHandle(
handle -> handle.createQuery(query).map((index, r, ctx) -> r.getString("name")).list()
final String query = StringUtils.format(
"SELECT name, created_timestamp FROM %s",
ingestConfig.get().getTablesTable()
);
List<Table> tables = metadataConnector.getDBI().withHandle(
handle -> handle.createQuery(query)
.map((index, r, ctx) ->
new Table(
r.getString("name"),
DateTimes.of(r.getString("created_timestamp"))
)
).list()
);

return tables;
}

}
Loading

0 comments on commit 303c1e6

Please sign in to comment.