diff --git a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/IngestServiceMetadataStore.java b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/IngestServiceMetadataStore.java index b19540edf9c1..8ed13c889c1f 100644 --- a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/IngestServiceMetadataStore.java +++ b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/IngestServiceMetadataStore.java @@ -16,6 +16,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** * Persistence layer for ingest service, for storage of ingest jobs, ingest schemas @@ -25,22 +26,40 @@ public interface IngestServiceMetadataStore // tables int insertTable(String name); + boolean druidTableExists(String name); - List getAllTableNames(); + + List getTables(); // jobs String stageJob(String tableName, JobRunner jobType); + int scheduleJob(String jobId, IngestSchema schema); + void setJobStatus(String jobId, JobStatus jobStatus); + void setJobState(String jobId, JobState jobState); + void setJobStateAndStatus(String jobId, @Nullable JobStatus status, @Nullable JobState jobState); + void setJobCancelled(String jobId, JobStatus status); + int jobRetry(String jobId); List getJobs(@Nullable JobState jobState); + @Nullable IngestJob getJob(String jobId); + /** + * @param jobStatesToFilterOn If null then return all tables with associated job states even when the table does not + * have jobs (in this case job states will be an empty list). + * If the list is empty return tables with no jobs. + * If the list is non-empty return tables only having states specified. + * @return A list of tables with their associated job state count + */ + Set getJobCountPerTablePerState(@Nullable Set jobStatesToFilterOn); + // schemas int createSchema(IngestSchema schema); IngestSchema getSchema(int schemaId); diff --git a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/Table.java b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/Table.java new file mode 100644 index 000000000000..1e45176a94ee --- /dev/null +++ b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/Table.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) Imply Data, Inc. All rights reserved. + * + * This software is the confidential and proprietary information + * of Imply Data, Inc. You shall not disclose such Confidential + * Information and shall use it only in accordance with the terms + * of the license agreement you entered into with Imply. + */ + +package io.imply.druid.ingest.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.security.Action; +import org.joda.time.DateTime; + +import java.util.HashSet; +import java.util.Set; + +public class Table +{ + private String name; + private DateTime createdTime; + private Set permissions = new HashSet<>(); + + public Table(Table table) + { + this.name = table.getName(); + this.createdTime = table.getCreatedTime(); + this.permissions = new HashSet<>(table.getPermissions()); + } + + @JsonCreator + public Table( + @JsonProperty("name") String name, + @JsonProperty("createdTime") DateTime createdTime + ) + { + this.name = name; + this.createdTime = createdTime; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public DateTime getCreatedTime() + { + return createdTime; + } + + + @JsonProperty + public Set getPermissions() + { + return permissions; + } + + public Table addPermissions(Action permission) + { + this.permissions.add(permission); + return this; + } + + @Override + public boolean equals(Object o) + { + if (o == this) { + return true; + } + if (!(o instanceof Table)) { + return false; + } + Table other = (Table) o; + return this.name.equals(other.getName()) + && (this.getCreatedTime().equals(other.getCreatedTime())) + && (this.getPermissions().equals(other.getPermissions())); + } + + @Override + public int hashCode() + { + return 31 * this.getName().hashCode() + + this.getCreatedTime().hashCode() + + this.getPermissions().hashCode() + + super.hashCode(); + } + +} diff --git a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/TableJobStateStats.java b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/TableJobStateStats.java new file mode 100644 index 000000000000..d306ce686aca --- /dev/null +++ b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/TableJobStateStats.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) Imply Data, Inc. All rights reserved. + * + * This software is the confidential and proprietary information + * of Imply Data, Inc. You shall not disclose such Confidential + * Information and shall use it only in accordance with the terms + * of the license agreement you entered into with Imply. + */ + +package io.imply.druid.ingest.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.imply.druid.ingest.jobs.JobState; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class TableJobStateStats extends Table +{ + private List jobStateCounts = new ArrayList<>(); + + public TableJobStateStats(Table table) + { + super(table); + } + + @JsonCreator + public TableJobStateStats( + @JsonProperty("name") String name, + @JsonProperty("createdTime") DateTime createdTime, + @JsonProperty("jobState") JobState jobState, + @JsonProperty("count") int count + ) + { + super(name, createdTime); + this.addJobState(jobState, count); + } + + public TableJobStateStats addJobState(@Nullable JobState js, int count) + { + if (js != null) { + jobStateCounts.add(new JobStateCount(js, count)); + } + return this; + } + + @JsonProperty + public List getJobStateCounts() + { + return jobStateCounts; + } + + @Override + public boolean equals(Object o) + { + // this extends Table which has implemented equals... + if (!super.equals(o)) { + return false; + } + TableJobStateStats other = (TableJobStateStats) o; + return (this.jobStateCounts.equals(other.getJobStateCounts())); + } + + @Override + public int hashCode() + { + return 31 * this.getPermissions().hashCode() + + this.jobStateCounts.hashCode() + + super.hashCode(); + } + + public static class JobStateCount + { + private JobState jobState; + private int count; + + @JsonCreator + public JobStateCount( + @JsonProperty JobState js, + @JsonProperty int count + ) + { + this.jobState = js; + this.count = count; + } + + @JsonProperty + public JobState getJobState() + { + return jobState; + } + + @JsonProperty + public int getCount() + { + return count; + } + } +} diff --git a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStore.java b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStore.java index c41c48dc15f0..8ac0be11de58 100644 --- a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStore.java +++ b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStore.java @@ -19,12 +19,15 @@ import io.imply.druid.ingest.metadata.IngestJob; import io.imply.druid.ingest.metadata.IngestSchema; import io.imply.druid.ingest.metadata.IngestServiceMetadataStore; +import io.imply.druid.ingest.metadata.Table; +import io.imply.druid.ingest.metadata.TableJobStateStats; import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.data.input.InputFormat; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.SQLMetadataConnector; +import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.TransactionCallback; @@ -36,8 +39,12 @@ import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; /** * fill me out @@ -246,7 +253,8 @@ public int jobRetry(String jobId) } @Override - public @Nullable IngestJob getJob(String jobId) + public @Nullable + IngestJob getJob(String jobId) { return metadataConnector.getDBI().inTransaction( (handle, status) -> @@ -282,6 +290,66 @@ public List getJobs(@Nullable JobState jobState) }); } + @Override + public Set getJobCountPerTablePerState(@Nullable Set statesToFilterOn) + { + final String baseQueryStr = "SELECT t.name as tn, count(j.job_state) as cnt, j.job_state as js, " + + "t.created_timestamp as tcs FROM ingest_tables t " + + "LEFT JOIN ingest_jobs j ON j.table_name = t.name " + + "GROUP BY t.name, t.created_timestamp, j.job_state " + + "%s "; + + List tableJobStateStats = + metadataConnector.getDBI().inTransaction( + (handle, status) -> { + String queryStr; + if (statesToFilterOn == null) { + // all tables + queryStr = String.format(baseQueryStr, "ORDER BY t.name"); + } else if (statesToFilterOn.size() == 0) { + // only tables with no jobs + queryStr = String.format(baseQueryStr, "HAVING cnt = 0 ORDER BY t.name"); + } else { + // Add states constraint HAVING clause.... + // build IN list: + StringBuilder inList = new StringBuilder("("); + boolean first = true; + for (JobState st : statesToFilterOn) { + if (!first) { + inList.append(","); + } else { + first = false; + } + inList.append("'").append(st.name()).append("'"); + } + inList.append(")"); + + String tail = String.format("HAVING j.job_state IN %s ORDER by t.name", inList); + queryStr = String.format(baseQueryStr, tail); + } + final Query> query = handle.createQuery(queryStr); + // get tables with their corresponding states + Map tableJobsMap = new HashMap<>(); + return query.map( + (index, r, ctx) -> { + // get sql results: + String tn = r.getString("tn"); + int count = r.getInt("cnt"); + JobState js = Optional.ofNullable(r.getString("js")).map(jss -> JobState.fromString(jss)) + .orElse(null); + DateTime tct = DateTimes.of(r.getString("tcs")); + // update table map cache: + if (tableJobsMap.computeIfPresent(tn, (k, v) -> v.addJobState(js, count)) == null) { + tableJobsMap.put(tn, new TableJobStateStats(tn, tct, js, count)); + } + return tableJobsMap.get(tn); + }).list(); + } + ); + + return new HashSet<>(tableJobStateStats); + } + @Nonnull private IngestJob resultRowAsIngestJob(ResultSet r) throws SQLException { @@ -430,9 +498,12 @@ public int deleteSchema(int schemaId) @Override public Integer inTransaction(Handle handle, TransactionStatus transactionStatus) { - return handle.createStatement(StringUtils.format("DELETE from %1$s WHERE id = :id", ingestConfig.get().getSchemasTable())) - .bind("id", schemaId) - .execute(); + return handle.createStatement(StringUtils.format( + "DELETE from %1$s WHERE id = :id", + ingestConfig.get().getSchemasTable() + )) + .bind("id", schemaId) + .execute(); } } ); @@ -499,12 +570,23 @@ public boolean druidTableExists(String name) } @Override - public List getAllTableNames() + public List
getTables() { - final String query = StringUtils.format("SELECT name FROM %s", ingestConfig.get().getTablesTable()); - return metadataConnector.getDBI().withHandle( - handle -> handle.createQuery(query).map((index, r, ctx) -> r.getString("name")).list() + final String query = StringUtils.format( + "SELECT name, created_timestamp FROM %s", + ingestConfig.get().getTablesTable() ); + List
tables = metadataConnector.getDBI().withHandle( + handle -> handle.createQuery(query) + .map((index, r, ctx) -> + new Table( + r.getString("name"), + DateTimes.of(r.getString("created_timestamp")) + ) + ).list() + ); + + return tables; } } diff --git a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/server/TablesResource.java b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/server/TablesResource.java index 878aaa13e17d..f61df3ce78fe 100644 --- a/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/server/TablesResource.java +++ b/extensions-imply/ingest-service/src/main/java/io/imply/druid/ingest/server/TablesResource.java @@ -17,9 +17,13 @@ import io.imply.druid.ingest.config.IngestServiceTenantConfig; import io.imply.druid.ingest.files.FileStore; import io.imply.druid.ingest.jobs.JobRunner; +import io.imply.druid.ingest.jobs.JobState; import io.imply.druid.ingest.jobs.runners.BatchAppendJobRunner; import io.imply.druid.ingest.metadata.IngestServiceMetadataStore; +import io.imply.druid.ingest.metadata.Table; +import io.imply.druid.ingest.metadata.TableJobStateStats; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Resource; @@ -28,6 +32,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -38,8 +43,13 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Locale; +import java.util.Set; @Path("/ingest/v1/tables") public class TablesResource @@ -65,30 +75,46 @@ public TablesResource( @GET @Produces(MediaType.APPLICATION_JSON) - public Response getAllTables( + public Response getTables( + @DefaultValue("RUNNING") @QueryParam("states") String states, @Context final HttpServletRequest req ) { - // iterate through list of tables, filter by write authorized - Function> raGenerator = table -> Collections.singletonList( - new ResourceAction(new Resource(table, ResourceType.DATASOURCE), Action.WRITE) - ); - - - List allTables = metadataStore.getAllTableNames(); - - List authorizedTables = Lists.newArrayList( - AuthorizationUtils.filterAuthorizedResources( - req, - allTables, - raGenerator, - authorizerMapper - ) - ); - return Response.ok(ImmutableMap.of("tables", authorizedTables)).build(); + Set allTables; + Set jss = null; // this is the value for "ALL" + String badArgument = null; + if (states.toUpperCase(Locale.ROOT).equals("NONE")) { + jss = Collections.emptySet(); + } else if (!states.toUpperCase(Locale.ROOT).equals("ALL")) { + // only valid state names are allowed here, note that "ALL" AND "NONE" are not + // valid here and will be rejected... + jss = new HashSet<>(); + String[] statesList = states.split(","); + for (String state : statesList) { + try { + jss.add(JobState.valueOf(state.toUpperCase(Locale.ROOT))); + } + catch (Exception e) { + badArgument = state; + } + } + } + Response response; + if (badArgument != null) { + response = Response.status(400).entity(String.format("Bad state name: %s", badArgument)).build(); + } else { + // all ok.... + allTables = metadataStore.getJobCountPerTablePerState(jss); + assignPermissions(allTables, req); + List sorted = new ArrayList<>(allTables); + sorted.sort(Comparator.comparing(TableJobStateStats::getName)); + response = Response.ok(ImmutableMap.of("tables", sorted)).build(); + } + return response; } @POST + @Produces(MediaType.APPLICATION_JSON) @Path("/{table}") public Response createTable( @PathParam("table") String tableName, @@ -109,16 +135,29 @@ public Response createTable( metadataStore.insertTable(tableName); } catch (Exception e) { - // TODO: we should be more fine grained here with respect to what happened... + // One interesting case if when the table already exists...one approach is just to return + // a 200 in this case but that may be misleading to the client since some data in the table + // (i.e. created time) will not be updated. But this maybe good enough since 200 is ok + // and when we create we return 201 (created). + // Another caveat is that parsing the + // exception that we get here is not straightforward since the cause is wrapped inside and the + // actual exception will depend on the database behind it. It seems more reasonable to + // create our own exception and manage the exceptions closer to the where the server is called and + // throw our own exceptions there... + // + // For now just saying that all exceptions are errors and + // propagating the message to the client... issue = e.getMessage(); isOk = false; } Response r; if (isOk) { - r = Response.ok().build(); + // the table resource is abnormal in the sense that it has no id, the name itself is its own id + // in a more RESTful way we would append the newly created id to the req.getPathInfo()... + r = Response.created(URI.create("")).build(); } else { - r = Response.serverError().entity(issue).build(); + r = Response.serverError().entity(ImmutableMap.of("error", issue)).build(); } return r; } @@ -164,7 +203,8 @@ public Response listIngestJobs( @ResourceFilters(TablesResourceFilter.class) public Response stageIngestJob( final JobRunner jobType, - @PathParam("table") String tableName + @PathParam("table") + String tableName ) { if (!metadataStore.druidTableExists(tableName)) { @@ -197,8 +237,10 @@ public Response cancelIngestJob( @ResourceFilters(TablesResourceFilter.class) public Response sampleIngestJob( final IngestJobRequest sampleRequest, - @PathParam("table") String tableName, - @PathParam("jobId") String jobId + @PathParam("table") + String tableName, + @PathParam("jobId") + String jobId ) { // most of this logic should probably live somewhere else, but; @@ -215,8 +257,10 @@ public Response sampleIngestJob( @ResourceFilters(TablesResourceFilter.class) public Response scheduleIngestJob( final IngestJobRequest scheduleRequest, - @PathParam("table") String tableName, - @PathParam("jobId") String jobId + @PathParam("table") + String tableName, + @PathParam("jobId") + String jobId ) { // this is not enough, we need to handle the alternative where schemaId is set @@ -231,4 +275,35 @@ public Response scheduleIngestJob( return Response.created(URI.create("")).build(); } + private void assignPermissions(Set tables, HttpServletRequest req) + { + // decorate WRITE tables: + // iterate through list of tables, filter by write authorized + Function> raGenerator = table -> Collections.singletonList( + new ResourceAction(new Resource(table.getName(), ResourceType.DATASOURCE), Action.WRITE) + ); + Lists.newArrayList( + AuthorizationUtils.filterAuthorizedResources( + req, + tables, + raGenerator, + authorizerMapper + ) + ).forEach(ts -> ts.addPermissions(Action.WRITE)); + + // another pass with Action.READ + // Need to reset the checked flag... + req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, null); + raGenerator = table -> Collections.singletonList( + new ResourceAction(new Resource(table.getName(), ResourceType.DATASOURCE), Action.READ) + ); + Lists.newArrayList( + AuthorizationUtils.filterAuthorizedResources( + req, + tables, + raGenerator, + authorizerMapper + ) + ).forEach(ts -> ts.addPermissions(Action.READ)); + } } diff --git a/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStoreTest.java b/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStoreTest.java index 9153f8f804e2..9ebcc077d839 100644 --- a/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStoreTest.java +++ b/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/metadata/sql/IngestServiceSqlMetadataStoreTest.java @@ -20,6 +20,8 @@ import io.imply.druid.ingest.jobs.status.TaskBasedJobStatus; import io.imply.druid.ingest.metadata.IngestJob; import io.imply.druid.ingest.metadata.IngestSchema; +import io.imply.druid.ingest.metadata.Table; +import io.imply.druid.ingest.metadata.TableJobStateStats; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; @@ -33,8 +35,10 @@ import org.junit.Rule; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public class IngestServiceSqlMetadataStoreTest { @@ -52,6 +56,7 @@ public class IngestServiceSqlMetadataStoreTest @Before public void setup() { + // using derby because mocking was too complex connector = derbyConnectorRule.getConnector(); metadataStore = new IngestServiceSqlMetadataStore(() -> config, connector, MAPPER); } @@ -81,25 +86,81 @@ public void testCreateSchemasTable() } @Test - public void testInsertTable() + public void testTablesInsert() { - List tables = metadataStore.getAllTableNames(); + List
tables = metadataStore.getTables(); Assert.assertEquals(0, tables.size()); int updateCount = metadataStore.insertTable(TABLE); - tables = metadataStore.getAllTableNames(); + tables = metadataStore.getTables(); Assert.assertEquals(1, tables.size()); - Assert.assertEquals(TABLE, tables.get(0)); + Assert.assertEquals(TABLE, tables.get(0).getName()); Assert.assertEquals(1, updateCount); } + @Test(expected = Exception.class) + public void testTablesInsertDup() + { + int updateCount = metadataStore.insertTable(TABLE); + metadataStore.insertTable(TABLE); + } + @Test - public void testTableExists() + public void testTablesExists() { - Assert.assertFalse(metadataStore.druidTableExists("not-yet")); + Assert.assertFalse(metadataStore.druidTableExists(TABLE)); metadataStore.insertTable(TABLE); Assert.assertTrue(metadataStore.druidTableExists(TABLE)); } + + @Test + public void testGetJobCountPerTablePerState() + { + metadataStore.insertTable(TABLE); + String jobId = metadataStore.stageJob(TABLE, jobType); + IngestSchema someSchema = new IngestSchema( + new TimestampSpec("time", "iso", null), + new DimensionsSpec( + ImmutableList.of( + StringDimensionSchema.create("x"), + StringDimensionSchema.create("y") + ) + ), + new JsonInputFormat(null, null, null), + "test schema" + ); + metadataStore.scheduleJob(jobId, someSchema); + metadataStore.setJobState(jobId, JobState.RUNNING); + + Set stats = + metadataStore.getJobCountPerTablePerState(Collections.singleton(JobState.RUNNING)); + Assert.assertTrue(stats.size() == 1); + stats.forEach(s -> { + Assert.assertTrue(s.getJobStateCounts().get(0).getJobState().equals(JobState.RUNNING)); + Assert.assertTrue(s.getJobStateCounts().size() == 1); + }); + + + JobStatus expectedStatus = new TaskBasedJobStatus(null); + metadataStore.setJobStateAndStatus(jobId, expectedStatus, JobState.COMPLETE); + + + IngestJob job = metadataStore.getJob(jobId); + Assert.assertNotNull(job); + Assert.assertEquals(JobState.COMPLETE, job.getJobState()); + Assert.assertEquals(expectedStatus, job.getJobStatus()); + + stats = + metadataStore.getJobCountPerTablePerState(Collections.singleton(JobState.COMPLETE)); + Assert.assertTrue(stats.size() == 1); + stats.forEach(s -> { + Assert.assertTrue(s.getJobStateCounts().get(0).getJobState().equals(JobState.COMPLETE)); + Assert.assertTrue(s.getJobStateCounts().size() == 1); + }); + + } + + @Test public void testStageJob() { @@ -329,6 +390,26 @@ public void testGetAllJobs() Assert.assertTrue(scheduledJobs.stream().allMatch(job -> expectedScheduled.contains(job.getJobId()))); } + @Test + public void testTablesGetAll() + { + Set expected = ImmutableSet.of("foo", "bar", "another", "one-more"); + + expected.forEach(table -> metadataStore.insertTable(table)); + + List
tables = metadataStore.getTables(); + Assert.assertEquals(4, tables.size()); + Assert.assertTrue(expected.containsAll(tables.stream().map(t -> t.getName()).collect(Collectors.toList()))); + } + + @Test + public void testTablesGetAllEmpty() + { + List
tables = metadataStore.getTables(); + Assert.assertEquals(0, tables.size()); + } + + @Test public void testIncrementRetry() { diff --git a/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/server/TablesResourceTest.java b/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/server/TablesResourceTest.java index 10c87a165f25..3db98ad916f6 100644 --- a/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/server/TablesResourceTest.java +++ b/extensions-imply/ingest-service/src/test/java/io/imply/druid/ingest/server/TablesResourceTest.java @@ -13,13 +13,17 @@ import io.imply.druid.ingest.config.IngestServiceTenantConfig; import io.imply.druid.ingest.files.FileStore; import io.imply.druid.ingest.jobs.JobRunner; +import io.imply.druid.ingest.jobs.JobState; import io.imply.druid.ingest.metadata.IngestSchema; import io.imply.druid.ingest.metadata.IngestServiceMetadataStore; +import io.imply.druid.ingest.metadata.Table; +import io.imply.druid.ingest.metadata.TableJobStateStats; import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; @@ -27,7 +31,6 @@ import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.server.security.Resource; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -38,7 +41,15 @@ import javax.ws.rs.core.Response; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class TablesResourceTest { @@ -63,29 +74,72 @@ public String getClusterId() private HttpServletRequest req; private TablesResource tablesResource; + private static final List
TABLE_LIST; + private static final List
TABLE_LIST_WRITE; + private static final List
TABLE_LIST_READ; + private static final List
TABLE_LIST_UNAUTHORIZED; + private static final List TABLE_NAMES_WRITE; + private static final List TABLE_NAMES_READ; + private static final List TABLE_NAMES_UNAUTHORIZED; + private static final List TABLE_NAMES; + + static { + + TABLE_LIST_WRITE = Arrays.asList( + new Table("foo1", DateTimes.nowUtc()), + new Table("foo2", DateTimes.nowUtc()), + new Table("foo3", DateTimes.nowUtc()), + new Table("foo4", DateTimes.nowUtc()), + new Table("foo5", DateTimes.nowUtc()), + new Table("foo6", DateTimes.nowUtc()) + ); + + TABLE_LIST_READ = Arrays.asList( + new Table("bar1", DateTimes.nowUtc()), + new Table("bar2", DateTimes.nowUtc()), + new Table("bar3", DateTimes.nowUtc()) + ); + + TABLE_LIST_UNAUTHORIZED = Arrays.asList( + new Table("crook1", DateTimes.nowUtc()), + new Table("crook2", DateTimes.nowUtc()), + new Table("crook3", DateTimes.nowUtc()) + ); + + TABLE_LIST = new ArrayList<>(); + TABLE_LIST.addAll(TABLE_LIST_WRITE); + TABLE_LIST.addAll(TABLE_LIST_READ); + TABLE_LIST.addAll(TABLE_LIST_UNAUTHORIZED); + + TABLE_NAMES_WRITE = TABLE_LIST_WRITE.stream().map(Table::getName).collect(Collectors.toList()); + TABLE_NAMES_READ = TABLE_LIST_READ.stream().map(Table::getName).collect(Collectors.toList()); + TABLE_NAMES_UNAUTHORIZED = TABLE_LIST_UNAUTHORIZED.stream().map(Table::getName).collect(Collectors.toList()); + TABLE_NAMES = new ArrayList<>(); + TABLE_NAMES.addAll(TABLE_NAMES_WRITE); + TABLE_NAMES.addAll(TABLE_NAMES_READ); + TABLE_NAMES.addAll(TABLE_NAMES_UNAUTHORIZED); + } + @Before public void setup() { fileStore = EasyMock.createMock(FileStore.class); metadataStore = EasyMock.createMock(IngestServiceMetadataStore.class); - req = EasyMock.createStrictMock(HttpServletRequest.class); + req = EasyMock.createMock(HttpServletRequest.class); + AuthorizerMapper authMapper = new AuthorizerMapper(null) { @Override public Authorizer getAuthorizer(String name) { - return new Authorizer() - { - @Override - public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action) - { - if (resource.getName().equals("allow")) { - return new Access(true); - } else { - return new Access(false); - } + return (authenticationResult, resource, action) -> { + if (TABLE_NAMES_WRITE.contains(resource.getName()) && action == Action.WRITE) { + return new Access(true); + } else if (TABLE_NAMES_READ.contains(resource.getName()) && action == Action.READ) { + return new Access(true); + } else { + return new Access(false); } - }; } }; @@ -123,20 +177,18 @@ public void testStageJobShoulWorkWhenTableExists() throws URISyntaxException Assert.assertEquals(200, response.getStatus()); } + @Test public void testStageJobShoulReturn404WhenTableDoesNotExist() { // expectAuthorizationTokenCheck(); see resource filter annotations are not called when testing in this manner // see https://github.com/apache/druid/issues/6685 - String id = UUIDUtils.generateUuid(); EasyMock.expect(metadataStore.druidTableExists(EasyMock.eq(TABLE))).andReturn(false).once(); EasyMock.replay(fileStore, metadataStore, req); Response response = tablesResource.stageIngestJob(null, TABLE); - Map responseEntity = (Map) response.getEntity(); - Assert.assertEquals(404, response.getStatus()); } @@ -243,17 +295,202 @@ public void testScheduleIngestJobShouldFailWhenTooManyJobsForSameJobIdExist() Assert.assertEquals(500, response.getStatus()); EasyMock.verify(scheduleRequest); } + @Test - public void testInsertTable() + public void testTablesInsert() { - // this is the wrong check, unless we remain only using table write permission expectAuthorizationTokenCheck(); EasyMock.expect(metadataStore.insertTable(TABLE)).andReturn(1).once(); EasyMock.replay(metadataStore, req, fileStore); + + Response response = tablesResource.createTable(TABLE, req); + + Assert.assertEquals(201, response.getStatus()); + Assert.assertTrue(response.getMetadata().size() > 0); + Assert.assertTrue(response.getMetadata().containsKey("Location")); + } + + @Test + public void testTablesInsertServerError() + { + expectAuthorizationTokenCheck(); + EasyMock.expect(metadataStore.insertTable(TABLE)).andThrow(new RuntimeException("some server error")); + EasyMock.replay(metadataStore, req, fileStore); + Response response = tablesResource.createTable(TABLE, req); + Assert.assertEquals(500, response.getStatus()); Map responseEntity = (Map) response.getEntity(); - // TODO: probably need to check some stuff in the response body here.. + Assert.assertTrue(responseEntity.get("error").toString().length() > 0); + } + + + @Test + public void testTablesGetShouldReturnAllTablesWithNoJobs() + { + expectAuthorizationTokenCheck(); + + AtomicInteger i = new AtomicInteger(100); + Set expectedTablesSet = TABLE_LIST.stream().map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.contains(t.getName())) { + ts.addPermissions(Action.WRITE); + } else if (TABLE_NAMES_READ.contains(t.getName())) { + ts.addPermissions(Action.READ); + } + return ts; + }).collect(Collectors.toSet()); + List expectedTablesList = new ArrayList<>(expectedTablesSet); + expectedTablesList.sort(Comparator.comparing(TableJobStateStats::getName)); + + EasyMock.expect(metadataStore.getJobCountPerTablePerState(Collections.emptySet())) + .andReturn(expectedTablesSet); + EasyMock.replay(metadataStore, req, fileStore); + + Response response = tablesResource.getTables("NONE", req); + Map> responseTableStatsMap = (Map>) response.getEntity(); + + Assert.assertEquals(responseTableStatsMap.get("tables"), expectedTablesList); + + Assert.assertEquals(200, response.getStatus()); + } + + + @Test + public void testTablesGetShouldReturnOnlyTablesWithRunningState() + { + expectAuthorizationTokenCheck(); + + AtomicInteger i = new AtomicInteger(100); + Set expectedTablesSet = + TABLE_LIST + .stream() + .filter(t -> t.getName().equals(TABLE_NAMES_WRITE.get(0)) + || t.getName().equals(TABLE_NAMES_WRITE.get(1)) + || t.getName().equals(TABLE_NAMES_READ.get(2))) + .map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.contains(t.getName())) { + ts.addPermissions(Action.WRITE); + } else if (TABLE_NAMES_READ.contains(t.getName())) { + ts.addPermissions(Action.READ); + } + ts.addJobState(JobState.RUNNING, i.addAndGet(1)); + return ts; + }).collect(Collectors.toSet()); + List expectedTablesList = new ArrayList<>(expectedTablesSet); + expectedTablesList.sort(Comparator.comparing(TableJobStateStats::getName)); + + EasyMock.expect(metadataStore.getJobCountPerTablePerState(Collections.singleton(JobState.RUNNING))) + .andReturn(expectedTablesSet); + EasyMock.replay(metadataStore, req, fileStore); + + Response response = tablesResource.getTables("RUNNING", req); + Map> responseTableStatsMap = (Map>) response.getEntity(); + + Assert.assertEquals(responseTableStatsMap.get("tables"), expectedTablesList); + + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testTablesGetShouldReturnAllTablesWithOrWithoutJobs() + { + expectAuthorizationTokenCheck(); + + AtomicInteger i = new AtomicInteger(100); + Set expectedTablesSet = + TABLE_LIST + .stream() + .map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.contains(t.getName())) { + ts.addPermissions(Action.WRITE); + } else { + ts.addPermissions(Action.READ); + } + return ts; + }) + .map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.get(0).equals(t.getName())) { + ts.addJobState(JobState.STAGED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(1).equals(t.getName())) { + ts.addJobState(JobState.RUNNING, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(2).equals(t.getName())) { + ts.addJobState(JobState.SCHEDULED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(3).equals(t.getName())) { + ts.addJobState(JobState.CANCELLED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(4).equals(t.getName())) { + ts.addJobState(JobState.COMPLETE, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(5).equals(t.getName())) { + ts.addJobState(JobState.FAILED, i.addAndGet(1)); + } + return ts; + }).collect(Collectors.toSet()); + List expectedTablesList = new ArrayList<>(expectedTablesSet); + expectedTablesList.sort(Comparator.comparing(TableJobStateStats::getName)); + + EasyMock.expect(metadataStore.getJobCountPerTablePerState(Collections.singleton(JobState.RUNNING))) + .andReturn(expectedTablesSet); + EasyMock.replay(metadataStore, req, fileStore); + + Response response = tablesResource.getTables("RUNNING", req); + Map> responseTableStatsMap = (Map>) response.getEntity(); + + Assert.assertEquals(responseTableStatsMap.get("tables"), expectedTablesList); + + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testTablesGetShouldNotReturnUnauthorizedTables() + { + expectAuthorizationTokenCheck(); + + AtomicInteger i = new AtomicInteger(100); + Set expectedTablesSet = + TABLE_LIST + .stream() + .map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.contains(t.getName())) { + ts.addPermissions(Action.WRITE); + } else { + ts.addPermissions(Action.READ); + } + return ts; + }) + .map(t -> { + TableJobStateStats ts = new TableJobStateStats(t); + if (TABLE_NAMES_WRITE.get(0).equals(t.getName())) { + ts.addJobState(JobState.STAGED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(1).equals(t.getName())) { + ts.addJobState(JobState.RUNNING, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(2).equals(t.getName())) { + ts.addJobState(JobState.SCHEDULED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(3).equals(t.getName())) { + ts.addJobState(JobState.CANCELLED, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(4).equals(t.getName())) { + ts.addJobState(JobState.COMPLETE, i.addAndGet(1)); + } else if (TABLE_NAMES_WRITE.get(5).equals(t.getName())) { + ts.addJobState(JobState.FAILED, i.addAndGet(1)); + } + return ts; + }) + .filter(ts -> !TABLE_NAMES_UNAUTHORIZED.contains(ts.getName())) + .collect(Collectors.toSet()); + List expectedTablesList = new ArrayList<>(expectedTablesSet); + expectedTablesList.sort(Comparator.comparing(TableJobStateStats::getName)); + + EasyMock.expect(metadataStore.getJobCountPerTablePerState(null)) + .andReturn(expectedTablesSet); + EasyMock.replay(metadataStore, req, fileStore); + + Response response = tablesResource.getTables("ALL", req); + Map> responseTableStatsMap = (Map>) response.getEntity(); + + Assert.assertEquals(responseTableStatsMap.get("tables"), expectedTablesList); Assert.assertEquals(200, response.getStatus()); } @@ -267,6 +504,9 @@ private void expectAuthorizationTokenCheck() .andReturn(authenticationResult) .atLeastOnce(); + req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, null); + EasyMock.expectLastCall().anyTimes(); + req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false); EasyMock.expectLastCall().anyTimes();