-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make TypedPipe Optimizations configurable #1738
Conversation
Is there any examples using this with sets of optimization phases? (didn't see one other than empty?) But i guess one API question here is should we point to an optimization phase as a single class like this or should it be a list of rules? (Thus requiring rules or having rule builders that are reflectively instantiated -- but ultimately this would let us for different jobs on the command line possibly flip rules on and off? ) I also presume in the seq here each rule is itself a combined set of all rules for that phase? -- might be too verbose to do what i suggest above is my own devils advocate here. In that for full configurability we would want to do a spec something like (I've no super strong preference here, but this change is more API impacting that a lot of the previous ones since adding to Config) |
I saw two comments:
How does it sound if I add more tests to address 1. above? |
if your confident there isn't an API breakage concern/back into a corner i think it really addresses most of my (1) and (2). Since (1) really was trying to get at exercising the new API. |
def optimizeWriteBatch(writes: List[ToWrite], rules: Seq[Rule[TypedPipe]]): HMap[TypedPipe, TypedPipe] = { | ||
val dag = Dag.empty(typed.OptimizationRules.toLiteral) | ||
val (d1, ws) = writes.foldLeft((dag, List.empty[Id[_]])) { | ||
case ((dag, ws), Force(p)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this force a Force from a force to disk or a force to disk execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ForceToDiskExecution.
res | ||
dest.writeFrom(toPipe[T](dest.sinkFields)(flowDef, mode, dest.setter)) | ||
// We want to fork after this point | ||
fork |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we make this change? I guess it should be ok now with the better optimizer? i'm guessing before we needed to do it the first way to avoid re-computation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why it was ever the other way. Many .writes
are not consumed. But by putting the fork before, you are always going to do some extra boxing in and out of a single tuple.
I can revert it, I don't think tests pass or fail either way. We really need a better set of performance vs just correctness tests.
private case class State( | ||
filesToCleanup: Map[Mode, Set[String]], | ||
forcedPipes: Map[(Config, Mode, TypedPipe[Any]), Future[TypedPipe[Any]]]) { | ||
initToOpt: HMap[TypedPipe, TypedPipe], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we comment on this field
@@ -95,19 +101,37 @@ class AsyncFlowDefRunner extends Writer { self => | |||
case None => this | |||
} | |||
|
|||
def addPipe[T](c: Config, | |||
/** | |||
* Returns true if we actually add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you expand on this comment
Returns true if we will add to our forced pipes, false if cache hit and noop. I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. The thing is, writes are side-effects. We only want to take that side effect once. So, if you return true, we should actually do the side effect. I'll update the comment.
private case class State( | ||
filesToCleanup: Map[Mode, Set[String]], | ||
forcedPipes: Map[(Config, Mode, TypedPipe[Any]), Future[TypedPipe[Any]]]) { | ||
initToOpt: HMap[TypedPipe, TypedPipe], | ||
forcedPipes: HMap[StateKey, WorkVal]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth ensuring we have good comments on all the state fields here (i know this is old, but helps review/reading). i.e. does force mean the same as in terms of the TypedPipe force to disk. Is it something we need to materialize as an output. And i.e. these are our results from the execution run ultimately?
|
||
val phases = defaultOptimizationRules( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are optimizing on toPipe does the optimization rules in the Execution have any effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are careful to avoid calling toPipe
in Execution. We call toPipeUnoptimized
. This is because we want to do the optimization on the whole graph (multiple sinks) that Execution can see. This is likely to be better and allow more aggressive optimization than the per-pipe approach of .write
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not hit toPipe going in and out of groupBy's?
lgtm modulo few comments and a final question |
Thanks for the review! I'll add the comments you wanted and if those look good, will merge (and on to the next item!). I'm getting excited about scalding 0.18. I hope we don't hit any problems shipping it to twitter (I think we have tried to keep the API changes small). Of course, we still do have the reducer estimation API change that twitter sent that we reverted in 0.17. Was that a big pain to change code? Can we do anything to make that easier for people to adopt? |
@johnynek Nah its not that bad, i suspect most people don't have custom reducer estimators so its not likely to bleed too far from that POV. I had a branch to adapt some of Stripe's code for it. I think we've only 2 reducer estimators internally. So should be ok.... |
not anymore. groupBy is just building an ADT, then we recursively apply the planning, and we never call toPipe in that recursion (you can double check). |
mmm if we are breaking the API here should we mark the toPipe and from in TypedPipe as deprecated? (to be removed in scalding 0.19.0), and re-point at some cascading specific version of them? |
Added #1745 so we can add configuration options without binary breaks. About toPipe being deprecated, I definitely want to get there before 0.18 but I want to see how tractable it is to factor scalding-typed into a separate project, and I will try to do it there. |
@ianoc how does this look? I think I addressed your concerns about comments. Also I added tests about the configuration of optimization rules to make sure they are triggered as expected in both the toPipe and Execution cases. |
got verbal signoff from @ianoc |
closes #1735
This makes it possible to control what optimizations are applied. This allows us to improve the tests since we can optimize only parts of the graph.