-
Notifications
You must be signed in to change notification settings - Fork 36
Conversation
Codecov Report
@@ Coverage Diff @@
## master #208 +/- ##
============================================
- Coverage 72.26% 72.25% -0.02%
- Complexity 1283 1290 +7
============================================
Files 140 139 -1
Lines 5931 5979 +48
Branches 466 469 +3
============================================
+ Hits 4286 4320 +34
- Misses 1440 1454 +14
Partials 205 205
Flags with carried forward coverage won't be shown. Click here to find out more. |
@@ -142,6 +141,7 @@ | |||
public static final String AD_BASE_URI = "/_opendistro/_anomaly_detection"; | |||
public static final String AD_BASE_DETECTORS_URI = AD_BASE_URI + "/detectors"; | |||
public static final String AD_THREAD_POOL_NAME = "ad-threadpool"; | |||
public static final String COLD_START_THREAD_POOL_NAME = "ad-cold-start-threadpool"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Shall we add this as last constant? ( constants arranged alphabetically )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -88,8 +99,11 @@ public boolean expired(Duration stateTtl, Instant now) { | |||
if (checkpoint != null) { | |||
ans = ans && expired(stateTtl, now, checkpoint); | |||
} | |||
if (lastError != null) { | |||
ans = ans && expired(stateTtl, now, lastError.getValue()); | |||
if (lastDetectionError != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Is it possible to have both (lastDetectionError and lastColdStartException) to be not null ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is possible.
* @param adID detector id | ||
* @return last cold start exception for the detector | ||
*/ | ||
public Exception fetchColdStartException(String adID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in AnomalyResultTransportAction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. For some reason AnomalyResultTransportAction was not loaded .
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java
Outdated
Show resolved
Hide resolved
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code coverage seems to decrease
new FixedExecutorBuilder( | ||
settings, | ||
AD_THREAD_POOL_NAME, | ||
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4), | ||
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE, | ||
"opendistro.ad." + AD_THREAD_POOL_NAME | ||
), | ||
new FixedExecutorBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why keep the overhead of an additional thread pool constantly when model training is at best occasional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
searchFeatureDao | ||
.getLatestDataTime( | ||
detector, | ||
ActionListener.wrap(latest -> getColdStartSamples(latest, detector, listener), listener::onFailure) | ||
new ThreadedActionListener<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should this query call back be run separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, all AD code should run inside AD threadpool. This is long overdue. I am starting with cold start code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean every callback needs to run this way? can this be simplified at a higher level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. I don't know how to simplify this. Any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransportNodesAction seems to take thread pool name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You meant the threadpool name in TransportResponseHandler? Our case is not just transport thread. Also, the threadpool name is good for one callback. It does not cover nested call backs.
/** | ||
* Set last cold start error of a detector | ||
* @param adID detector id | ||
* @param exception exception, can be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does a null exception mean? is that an exception or no exception? why not using optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null means there is no error, so no exception. I know you are passionate about optional. Here it is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's no error, would it suffice to leave the entire record to null? <exception, time> is null seems simpler.
optional question. why optional is unnecessary ? clearly this variable can be present or absent and the need seems to exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to optional
public Exception fetchColdStartException(String adID) { | ||
if (transportStates.containsKey(adID) && transportStates.get(adID).getLastColdStartException() != null) { | ||
Entry<Exception, Instant> errorAndTime = transportStates.get(adID).getLastColdStartException(); | ||
errorAndTime.setValue(clock.instant()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why getting the exception status involves updating the timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same as other transport state. We are gonna clear these states if they are not accessed within one hour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would one last access time for one entire statue be sufficient? instead of keeping one timestamp for each field and still using the latest one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
if (exception instanceof AnomalyDetectionException) { | ||
// partitioned model exceeds memory limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two cases are different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LimitExceededException is a subtype of AnomalyDetectionException. I am commenting on one example here. Updated comment to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR changes the code path of cold start in the transport layer to use callbacks. Previously, I created AD’s ExecutorService that has one thread for cold starts in ColdStartRunner. When we need to trigger a cold start, we can submit a task in the ExecutorService, and consult a hash map (keyed by detector Id) that cached the results of recent cold start results. Since I have to invoke the cold start thread in various callbacks, I created a cold start thread pool and put the cold start result in the transport state. This PR also handles new exceptions like invalid queries introduced by recent changes on ModelManager and FeatureManager. This PR lowers the severity of a couple of log messages in HashRing and RCFPollingTransportAction to avoid overwhelming readers of log files. These log messages are common. This PR corrects typos and updates known causes of EndRunException in comments. Testing done: 1. Simulated cold start failures: Exceptions of cold starts can be seen by the transport layer. EndRunException can cause AD jobs to be terminated. 2. Happy case of a cold start still works.
Yes. Maybe because I deleted a class. |
} catch (Exception ex) { | ||
throw new EndRunException(detector.getDetectorId(), "Error while cold start", ex, false); | ||
if (previousException != null) { | ||
LOG.error("Previous exception of {}: {}", () -> detectorId, () -> previousException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use () -> detectorId
? detectorId
should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the document you shared ,
The lambda expression is only evaluated if the corresponding log level is enabled. This is referred to as lazy logging.
I think error
log level is always enabled, so the lambda () -> detectorId
will always be executed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right. Changed.
throw new EndRunException(detector.getDetectorId(), "Error while cold start", ex, false); | ||
if (previousException != null) { | ||
LOG.error("Previous exception of {}: {}", () -> detectorId, () -> previousException); | ||
if (previousException instanceof EndRunException && ((EndRunException) previousException).isEndNow()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's EndRunException
and isEndNow()
is true, JobRunner will stop detector job. Will the previousException
be cleaned when stop detector job? Why need to check previous exception ? If user wait for some time and restart detector, cluster state may change, and user may not get previous exception any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will be cleared. We need to check previous exception because cold start is asynchronous. We don't know cold start error instantly.
If transport state gets cleared up, next few runs would trigger the same error.
LOG.info("Trigger cold start for {}", detectorId); | ||
coldStart(detector); | ||
} else { | ||
LOG.error(String.format("Fail to get checkpoint state for %s", detectorId), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw exception ? Why not record this exception as previousException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in callback. We cannot throw exception directly as it is asynchronous and won't be caught by AD job. What do we expect the job runner to do here? For example, we can reach here if cold start does not happen before and the system is under extreme heavy load and get request gets rejected. We can log the error and retry later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job runner will end detector job if getsEndRunException
and isEndNow
is true. If isEndNow
is false, will still stop detector job if the count of EndRunException
exceeds MAX_RETRY_FOR_END_RUN_EXCEPTION
(default 6 currently). For other exceptions except for EndRunException
, job runner will record it in AD result.
How about we return this exception to JobRunner so it can be logged into AD result ?
In this change, cold start is put into another thread pool and run asynchronously; will check previousException
if no checkpoint or no model. How about we check EndRunException
which isEndNow
is false and return it to JobRunner, so JobRunner can stop detector job if find too many EndRunExceptions
? That can avoid endlessly run cold start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed as we discussed offline
// detector has changed or not. If changed, trigger indexing. | ||
private Entry<String, Instant> lastError; | ||
private Entry<String, Instant> lastDetectionError; | ||
// last training error. Used to save cold error by a concurrent cold start thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cold error
means cold start error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed
} | ||
|
||
if (!(exp instanceof ResourceNotFoundException)) { | ||
throw exp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return this exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The returned exception is for cold start related failure only.
searchFeatureDao | ||
.getLatestDataTime( | ||
detector, | ||
ActionListener.wrap(latest -> getColdStartSamples(latest, detector, listener), listener::onFailure) | ||
new ThreadedActionListener<>(logger, threadPool, AnomalyDetectorPlugin.AD_THREAD_POOL_NAME, latestTimeListener, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. why not using configured executor name instead of hardcoding it in all places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Could you explain more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnomalyDetectorPlugin.AD_THREAD_POOL_NAME is the name.
why not using a configured name like threadpool in configuration? instead of hardcoding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the value is configurable using constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit to make it configurable is to make testing easy.
My concern is we may not have just one threadpool to use in this class. For example, I want to add a dedicated cold start threadpool for high cardinality. So we should add threadpool_ad, threadpool_coldStart for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are additional dependency configuration (threadpool and executor names), they can also be passed via constructor and the implementation just needs to pick the corresponding configuration (threadpool and executor name combination)
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.Map.Entry; | ||
import java.util.Optional; | ||
|
||
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; | ||
|
||
public class TransportState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. documentation is missing for the interface of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also all the public methods (interface)
lastAccessTime = clock.instant(); | ||
} | ||
|
||
public boolean expired(Duration stateTtl, Instant now) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. the input can be simplified to one parameter as the expiration time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Could you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why using two input parameters for this method when one parameter such as expiration time is sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -397,32 +396,46 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< | |||
* | |||
* @param failure object that may contain exceptions thrown | |||
* @param detector detector object | |||
* @return whether cold start runs | |||
* @return exception if we get resource not found exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor.
when resource is not found, what exception does this method return? why the general exception instead of the more specific one? if other exceptions occur, what does this method return?
also who is we? does it refer to the reader of the doc? the client of the interface? the developer of the implementation? or the software implementation? please avoid incorrect/ambiguous use of we when the referee is software.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to new AnomalyDetectionException.
Is there any place saying we cannot use we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was the question about who we refers to. https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which part of the doc do you want me to read? Any particular part related?
Anyway, I changed to "AD job execution".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Use 3rd person (descriptive) not 2nd person (prescriptive).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know
} | ||
} | ||
|
||
stateManager.getDetectorCheckpoint(detectorId, ActionListener.wrap(checkpointExists -> { | ||
if (!checkpointExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question. what should be returned when the checkpoint exists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a callback. We don't need to return anything.
} else { | ||
throw new EndRunException(detectorId, "Cannot get training data", false); | ||
} | ||
Optional<Exception> previousException = stateManager.fetchColdStartException(detectorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question. why should exception be cleared only when data point is missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "data point is missing"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
look at when the method coldStartIfNoCheckPoint gets called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another method calling fetchColdStartException: coldStartIfNoModel
} | ||
} | ||
return false; | ||
LOG.info("Trigger cold start for {}", detector.getDetectorId()); | ||
coldStart(detector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that last cold start still not finished when execute coldStart
? Should we check the state of last cold start ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible. Let me add a check.
… last cold start exception, and more
@@ -187,7 +188,7 @@ public void maintenance() { | |||
String detectorId = entry.getKey(); | |||
try { | |||
TransportState state = entry.getValue(); | |||
if (state.expired(stateTtl, clock.instant())) { | |||
if (state.expired(stateTtl)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. why not do expired(expirationdate=now-ttl)? ttl is an odd parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expirationdate is a date, while ttl is a duration. ttl means time to leave.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method can just take one input parameter as the final expiration time, which can be calculated by the client however they want. in this case, it is now-tll. the method is easier to understand and use and more reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Expiration date = last access time + ttl. Why is it now-ttl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this method takes one parameter that is the expiration time. if the last access time is before it, the method returns true, otherwise it returns false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, all the caller has to compute the expiration time. Current implementation looks easier that encapsulate the logic inside.
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.Map.Entry; | ||
import java.util.Optional; | ||
|
||
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; | ||
|
||
public class TransportState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also all the public methods (interface)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the change.
* Change to use callbacks in cold start This PR changes the code path of cold start in the transport layer to use callbacks. Previously, I created AD’s ExecutorService that has one thread for cold starts in ColdStartRunner. When we need to trigger a cold start, we can submit a task in the ExecutorService, and consult a hash map (keyed by detector Id) that cached the results of recent cold start results. Since I have to invoke the cold start thread in various callbacks, I created a cold start thread pool and put the cold start result in the transport state. This PR also handles new exceptions like invalid queries introduced by recent changes on ModelManager and FeatureManager. This PR lowers the severity of a couple of log messages in HashRing and RCFPollingTransportAction to avoid overwhelming readers of log files. These log messages are common. This PR corrects typos and updates known causes of EndRunException in comments. Testing done: 1. Simulated cold start failures: Exceptions of cold starts can be seen by the transport layer. EndRunException can cause AD jobs to be terminated. 2. Happy case of a cold start still works.
Issue #, if available:
#78
Description of changes:
This PR changes the code path of cold start in the transport layer to use callbacks.
Previously, I created AD’s ExecutorService that has one thread for cold starts in ColdStartRunner. When triggering a cold start, we can submit a task in the ExecutorService, and consult a hash map (keyed by detector Id) that cached the results of recent cold start results. Since I have to invoke the cold start thread in various callbacks, ColdStartRunner does not work anymore. I created a cold start thread pool and put the cold start result in the transport state instead.
This PR also handles new exceptions like invalid queries introduced by recent changes on ModelManager and FeatureManager.
This PR lowers the severity of a couple of log messages in HashRing and RCFPollingTransportAction to avoid overwhelming log file readers. These log messages are common.
This PR corrects typos and updates known causes of EndRunException in comments.
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.