Skip to content

Commit

Permalink
Add feature set status JOB_STARTING to denote feature sets waiting fo…
Browse files Browse the repository at this point in the history
…r job to get to RUNNING state (#714)

* Add additional feature set status to differentiate between pending job init and pending job ready

* Fix broken test
  • Loading branch information
Chen Zhiling authored May 18, 2020
1 parent 14cae74 commit c1a5c43
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
12 changes: 11 additions & 1 deletion core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.FeatureSetProto.FeatureSetStatus;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -102,7 +103,16 @@ public Job call() {

boolean requiresUpdate(Job job) {
// If set of feature sets has changed
return !Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()));
if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) {
return true;
}
// If any of the incoming feature sets were updated
for (FeatureSet featureSet : featureSets) {
if (featureSet.getStatus() == FeatureSetStatus.STATUS_PENDING) {
return true;
}
}
return false;
}

private Job createJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public Job updateJob(Job job) {
true);

job.setExtId(extId);
job.setStatus(JobStatus.PENDING);
return job;
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,19 @@ void startOrUpdateJobs(List<JobUpdateTask> tasks) {
tasks.forEach(ecs::submit);

int completedTasks = 0;
List<Job> startedJobs = new ArrayList<>();
while (completedTasks < tasks.size()) {
try {
Job job = ecs.take().get();
if (job != null) {
jobRepository.saveAndFlush(job);
startedJobs.add(job);
}
} catch (ExecutionException | InterruptedException e) {
log.warn("Unable to start or update job: {}", e.getMessage());
}
completedTasks++;
}
jobRepository.saveAll(startedJobs);
executorService.shutdown();
}

Expand All @@ -169,7 +171,7 @@ private void updateFeatureSetStatuses(List<JobUpdateTask> jobUpdateTasks) {
});
pending.forEach(
fs -> {
fs.setStatus(FeatureSetStatus.STATUS_PENDING);
fs.setStatus(FeatureSetStatus.STATUS_JOB_STARTING);
featureSetRepository.save(fs);
});
featureSetRepository.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import feast.proto.core.StoreProto.Store.StoreType;
import feast.proto.core.StoreProto.Store.Subscription;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -149,7 +150,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep
.build();
FeatureSet featureSet2 = FeatureSet.fromProto(featureSetProto2);
String extId = "ext";
ArgumentCaptor<Job> jobArgCaptor = ArgumentCaptor.forClass(Job.class);
ArgumentCaptor<List<Job>> jobArgCaptor = ArgumentCaptor.forClass(List.class);

Job expectedInput =
new Job(
Expand Down Expand Up @@ -183,9 +184,9 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep
new JobCoordinatorService(
jobRepository, featureSetRepository, specService, jobManager, feastProperties);
jcs.Poll();
verify(jobRepository, times(1)).saveAndFlush(jobArgCaptor.capture());
Job actual = jobArgCaptor.getValue();
assertThat(actual, equalTo(expected));
verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture());
List<Job> actual = jobArgCaptor.getValue();
assertThat(actual, equalTo(Collections.singletonList(expected)));
}

@Test
Expand Down Expand Up @@ -277,7 +278,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException {
feast.core.model.Store.fromProto(store),
Arrays.asList(featureSet2),
JobStatus.RUNNING);
ArgumentCaptor<Job> jobArgCaptor = ArgumentCaptor.forClass(Job.class);
ArgumentCaptor<List<Job>> jobArgCaptor = ArgumentCaptor.forClass(List.class);

when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "project1"))
.thenReturn(Lists.newArrayList(featureSet1, featureSet2));
Expand All @@ -294,8 +295,8 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException {
jobRepository, featureSetRepository, specService, jobManager, feastProperties);
jcs.Poll();

verify(jobRepository, times(2)).saveAndFlush(jobArgCaptor.capture());
List<Job> actual = jobArgCaptor.getAllValues();
verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture());
List<Job> actual = jobArgCaptor.getValue();

assertThat(actual.get(0), equalTo(expected1));
assertThat(actual.get(1), equalTo(expected2));
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/FeatureSet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ message FeatureSetMeta {
enum FeatureSetStatus {
STATUS_INVALID = 0;
STATUS_PENDING = 1;
STATUS_JOB_STARTING = 3;
STATUS_READY = 2;
}

0 comments on commit c1a5c43

Please sign in to comment.