-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated flink 1.8 -> 1.17. Added smoke tests for Flink #9046
Conversation
Thank you for the work on flink! It will take a few days for a review since we haven't had any contributor to the flink interface for a long while. |
@trivialfis no worries. I've mostly converted existing code and partially translated some of scala code to java as flink moved significant part of their code base that way. I am happy to hear any suggestions/comments. |
@CodingCat Hi, do you have some bandwidth for looking into this? |
e67a60b
to
1fc59f0
Compare
rebased to include latest changes. |
I've noticed that flink does not publish flink-scala for 2.13. Perhaps, it is wise to keep just java API in place. |
@dotbg Hi, are you familiar with flink (use flink in practice)? Or are you modifying the interface only for upgrading the Scala version? At the moment, we don't have any active contributors who work on flink. (nor any feature request for working with it) If you are interested in the flink interface in general, we can spend some time bringing it up-to-date and I will do a crash course myself for catching up. Otherwise, my PR review will be based on the "minimum amount of changes that can get things going". |
@trivialfis my former team was using flink extensively, but they relied on jpmml. I have noticed that the flink xgboost interface was out of date and wanted to fix that. My current job relies on spark and we want to upgrade our scala version as well. Hence I am in the "two birds with one stone" situation. I am fine with the "minimum amount of changes that can get things going" and, perhaps, deprecating the flink integration. I do not mind supporting the interface for a while, but I can not devote myself for the flink support unless there is a community interest. |
Actually, I'm quite excited that you work on the flink interface! We recently learned about the flink project and its strength in streaming data, then an expert come and bring it up to date! What are the odds?
No worries, we can see how it goes. I will try to catch up on this area as well. |
@trivialfis Great to hear. Well, a while ago I have done some work on flink ml, but I can not call myself an expert. Xbgoost can consider collaborating with the flink-ml project though. But this is, perhaps, a future topic :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on the flink interface. This is an initial review, I'm still going through the tutorials/learning at the moment.
I think we can start the flink interface from scratch if necessary considering its long inactive status. As a result, moving code from scala to java makes sense if it aligns with flink. These might be topics for the remote future, but having that in mind can reduce some constraints for us.
|
||
class XGBoostModel (booster: Booster) extends Serializable { | ||
class XGBoostModel(wrapped: JXGBoostModel) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name booster
is fine and seems clearer than the new name wrapped
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trivialfis I am wrapping the Java XGBoostModel implementation here, hence, the naming. I can rename this to javaXGBoostModel or something alike. Do you have a better name in mind? Sadly, booster is not what is used.
In principle, I delegate all operations made by scala's XGBoostModel to the 'wrapped' java implementation hence the naming choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the class and rewrote the scala example so that the java implementation is used.
jvm-packages/xgboost4j-flink/pom.xml
Outdated
<version>${flink.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-ml_${scala.binary.version}</artifactId> | ||
<version>${flink.version}</version> | ||
<artifactId>flink-ml-uber</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, does uber have a different meaning in flink-ml than the company name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flink-ml-uber is the artifact, which includes all flink-ml related classes. I cantry to cherry-pick particular dependencies, if necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using another dependency now.
@@ -0,0 +1,113 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For running tests and defining example, we can use https://github.com/dmlc/xgboost/blob/master/demo/data/veterans_lung_cancer.csv instead of libsvm files. We can copy it into resources during build.
I think that's probably better than us maintaining yet another svm parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I did not know that. thanks a lot! I will rewrite the tests then and this class will become obsolete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed class, rewritten the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the train/test CSV files as flink does not have the train/test split functionality and implementing it should not be done in this repository.
*/ | ||
public static XGBoostModel train(DataSet<VectorWithNorm> dtrain, | ||
Map<String, Object> params, | ||
int round) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the name numBoostRound
instead of round
. It's fine to have a breaking change for this. We can safely ignore the existing implementation and start from scratch considering that there's no user for a long period of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks a lot!
1fc59f0
to
f508642
Compare
null, | ||
null, | ||
numEarlyStoppingRounds); | ||
Communicator.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we wrap the shutdown in the try/catch/finally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wbo4958 done, I made it look similar to spark implementation. I am keen to hear other suggestions.
I am not familiar with Flink, but the code looks neat, and the training pipeline is quite similar to spark. LGTM. |
0bd2e70
to
c632e8d
Compare
rebased on top of the latest master branch |
@@ -0,0 +1,42 @@ | |||
Survival_label_lower_bound,Survival_label_upper_bound,Age_in_years,Karnofsky_score,Months_from_Diagnosis,Celltype=adeno,Celltype=large,Celltype=smallcell,Celltype=squamous,Prior_therapy=no,Prior_therapy=yes,Treatment=standard,Treatment=test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not entirely sure if we need to make a copy of the data in the git repository in order to have train-test split. Do we have a slice operator with flink data structure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we do not :( But I will work it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we just use the training dataset as the validation/test dataset for now, potentially with some added noise. We are kind of conservative about adding data files in git repository.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries, I worked it around. It should be much better now.
c632e8d
to
11277e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good to me.
I need to make a follow-up PR that documents that the API for flink is not yet stabilized.
Please help resolve the conflict. Thank you again for the nice work on bringing up the flink interface! |
11277e8
to
f0894ce
Compare
f0894ce
to
7035462
Compare
Thanks. a lot. |
@trivialfis I see that the |
Please ignore it for the moment, we are aware of the issue #9091 . |
Updated the Apache Flink - related code to the most recent version. It can be considered a first step towards #8650 as most recent Flink versions support scala 2.13. We can, certainly, follow Flink approach and deprecate scala-related flink logic, but I am not a person to make this decision.