diff --git a/Makefile b/Makefile index 1f41ad0904..7abbda0a1b 100644 --- a/Makefile +++ b/Makefile @@ -41,13 +41,13 @@ format-java: mvn spotless:apply lint-java: - mvn spotless:check + mvn --no-transfer-progress spotless:check test-java: - mvn test + mvn --no-transfer-progress test test-java-with-coverage: - mvn test jacoco:report-aggregate + mvn --no-transfer-progress test jacoco:report-aggregate build-java: mvn clean verify diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index b0b7aeeb35..a671a0f68e 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -71,7 +71,8 @@ public JobService( } } - /* Job Service API */ + // region Job Service API + /** * List Ingestion Jobs in Feast matching the given request. See CoreService protobuf documentation * for more detailed documentation. @@ -88,16 +89,16 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) // check that filter specified and not empty if (request.hasFilter() - && !(request.getFilter().getId() == "" - && request.getFilter().getStoreName() == "" - && request.getFilter().hasFeatureSetReference() == false)) { + && !(request.getFilter().getId().isEmpty() + && request.getFilter().getStoreName().isEmpty() + && !request.getFilter().hasFeatureSetReference())) { // filter jobs based on request filter ListIngestionJobsRequest.Filter filter = request.getFilter(); // for proto3, default value for missing values: // - numeric values (ie int) is zero // - strings is empty string - if (filter.getId() != "") { + if (!filter.getId().isEmpty()) { // get by id: no more filters required: found job Optional job = this.jobRepository.findById(filter.getId()); if (job.isPresent()) { @@ -105,7 +106,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) } } else { // multiple filters can apply together in an 'and' operation - if (filter.getStoreName() != "") { + if (!filter.getStoreName().isEmpty()) { // find jobs by name List jobs = this.jobRepository.findByStoreName(filter.getStoreName()); Set jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet()); @@ -140,7 +141,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) // convert matching job models to ingestion job protos List ingestJobs = new ArrayList<>(); for (String jobId : matchingJobIds) { - Job job = this.jobRepository.findById(jobId).get(); + Job job = this.jobRepository.findById(jobId).orElseThrow(); // job that failed on start won't be converted toProto successfully // and they're irrelevant here if (job.getStatus() == JobStatus.ERROR) { @@ -160,21 +161,20 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) * @param request restart ingestion job request specifying which job to stop * @throws NoSuchElementException when restart job request requests to restart a nonexistent job. * @throws UnsupportedOperationException when job to be restarted is in an unsupported status - * @throws InvalidProtocolBufferException on error when constructing response protobuf */ @Transactional - public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request) - throws InvalidProtocolBufferException { - // check job exists - Optional getJob = this.jobRepository.findById(request.getId()); - if (getJob.isEmpty()) { - // FIXME: if getJob.isEmpty then constructing this error message will always throw an error... - throw new NoSuchElementException( - "Attempted to stop nonexistent job with id: " + getJob.get().getId()); - } + public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request) { + String jobId = request.getId(); + + Job job = + this.jobRepository + .findById(jobId) + .orElseThrow( + () -> + new NoSuchElementException( + "Attempted to restart nonexistent job with id: " + jobId)); // check job status is valid for restarting - Job job = getJob.get(); JobStatus status = job.getStatus(); if (status.isTransitional() || status.isTerminal() || status == JobStatus.UNKNOWN) { throw new UnsupportedOperationException( @@ -202,20 +202,20 @@ public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request * @param request stop ingestion job request specifying which job to stop * @throws NoSuchElementException when stop job request requests to stop a nonexistent job. * @throws UnsupportedOperationException when job to be stopped is in an unsupported status - * @throws InvalidProtocolBufferException on error when constructing response protobuf */ @Transactional - public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) - throws InvalidProtocolBufferException { - // check job exists - Optional getJob = this.jobRepository.findById(request.getId()); - if (getJob.isEmpty()) { - throw new NoSuchElementException( - "Attempted to stop nonexistent job with id: " + getJob.get().getId()); - } + public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) { + String jobId = request.getId(); + + Job job = + this.jobRepository + .findById(jobId) + .orElseThrow( + () -> + new NoSuchElementException( + "Attempted to stop nonexistent job with id: " + jobId)); // check job status is valid for stopping - Job job = getJob.get(); JobStatus status = job.getStatus(); if (status.isTerminal()) { // do nothing - job is already stopped @@ -240,7 +240,9 @@ public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) return StopIngestionJobResponse.newBuilder().build(); } - /* Private Utility Methods */ + // endregion + // region Private Utility Methods + private Set mergeResults(Set results, Collection newResults) { if (results.size() <= 0) { // no existing results: copy over new results @@ -252,7 +254,7 @@ private Set mergeResults(Set results, Collection newResults) { return results; } - // converts feature set reference to a list feature set filter + /** converts feature set reference to a list feature set filter */ private ListFeatureSetsRequest.Filter toListFeatureSetFilter(FeatureSetReference fsReference) { // match featuresets using contents of featureset reference String fsName = fsReference.getName(); @@ -262,16 +264,13 @@ private ListFeatureSetsRequest.Filter toListFeatureSetFilter(FeatureSetReference // for proto3, default value for missing values: // - numeric values (ie int) is zero // - strings is empty string - ListFeatureSetsRequest.Filter filter = - ListFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName((fsName != "") ? fsName : "*") - .setProject((fsProject != "") ? fsProject : "*") - .build(); - - return filter; + return ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(fsName.isEmpty() ? "*" : fsName) + .setProject(fsProject.isEmpty() ? "*" : fsProject) + .build(); } - // sync job status using job manager + /** sync job status using job manager */ private Job syncJobStatus(JobManager jobManager, Job job) { JobStatus newStatus = jobManager.getJobStatus(job); // log job status transition diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index 27509620ff..6020db7b66 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -46,11 +46,7 @@ import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.types.ValueProto.ValueType.Enum; import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Optional; +import java.util.*; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -87,7 +83,7 @@ public void setup() { "*:*:*"); // fake featureset & job - this.featureSet = this.newDummyFeatureSet("food", 2, "hunger"); + this.featureSet = this.newDummyFeatureSet("food", "hunger"); this.job = this.newDummyJob("kafka-to-redis", "job-1111", JobStatus.PENDING); try { this.ingestionJob = this.job.toProto(); @@ -140,7 +136,7 @@ public void setupJobManager() { .thenReturn(this.newDummyJob(this.job.getId(), this.job.getExtId(), JobStatus.PENDING)); } - private FeatureSet newDummyFeatureSet(String name, int version, String project) { + private FeatureSet newDummyFeatureSet(String name, String project) { Feature feature = TestObjectFactory.CreateFeature(name + "_feature", Enum.INT64); Entity entity = TestObjectFactory.CreateEntity(name + "_entity", Enum.STRING); @@ -242,7 +238,7 @@ public void testListJobsByStoreName() { @Test public void testListIngestionJobByFeatureSetReference() { - // list job by feature set reference: name and version and project + // list job by feature set reference: name and project ListIngestionJobsRequest.Filter filter = ListIngestionJobsRequest.Filter.newBuilder() .setFeatureSetReference(this.fsReferences.get(0)) @@ -281,7 +277,7 @@ private StopIngestionJobResponse tryStopJob( fail("Expected exception, but none was thrown"); } } catch (Exception e) { - if (expectError != true) { + if (!expectError) { // unexpected exception e.printStackTrace(); fail("Caught Unexpected exception trying to restart job"); @@ -330,8 +326,7 @@ public void testStopUnsupportedError() { // check for UnsupportedOperationException when trying to stop jobs are // in an in unknown or in a transitional state JobStatus prevStatus = this.job.getStatus(); - List unsupportedStatuses = new ArrayList<>(); - unsupportedStatuses.addAll(JobStatus.getTransitionalStates()); + List unsupportedStatuses = new ArrayList<>(JobStatus.getTransitionalStates()); unsupportedStatuses.add(JobStatus.UNKNOWN); for (JobStatus status : unsupportedStatuses) { @@ -345,6 +340,12 @@ public void testStopUnsupportedError() { this.job.setStatus(prevStatus); } + @Test(expected = NoSuchElementException.class) + public void testStopJobForUnknownId() { + var request = StopIngestionJobRequest.newBuilder().setId("bogusJobId").build(); + jobService.stopJob(request); + } + // restart jobs private RestartIngestionJobResponse tryRestartJob( RestartIngestionJobRequest request, boolean expectError) { @@ -356,7 +357,7 @@ private RestartIngestionJobResponse tryRestartJob( fail("Expected exception, but none was thrown"); } } catch (Exception e) { - if (expectError != true) { + if (!expectError) { // unexpected exception e.printStackTrace(); fail("Caught Unexpected exception trying to stop job"); @@ -392,8 +393,7 @@ public void testRestartUnsupportedError() { // check for UnsupportedOperationException when trying to restart jobs are // in an in unknown or in a transitional state JobStatus prevStatus = this.job.getStatus(); - List unsupportedStatuses = new ArrayList<>(); - unsupportedStatuses.addAll(JobStatus.getTransitionalStates()); + List unsupportedStatuses = new ArrayList<>(JobStatus.getTransitionalStates()); unsupportedStatuses.add(JobStatus.UNKNOWN); for (JobStatus status : unsupportedStatuses) {