Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13293] [SQL] generate Expand #11177

Closed
wants to merge 4 commits into from
Closed

Conversation

davies
Copy link
Contributor

@davies davies commented Feb 12, 2016

Expand suffer from create the UnsafeRow from same input multiple times, with codegen, it only need to copy some of the columns.

After this, we can see 3X improvements (from 43 seconds to 13 seconds) on a TPCDS query (Q67) that have eight columns in Rollup.

Ideally, we could mask some of the columns based on bitmask, I'd leave that in the future, because currently Aggregation (50 ns) is much slower than that just copy the variables (1-2 ns).

@rxin
Copy link
Contributor

rxin commented Feb 12, 2016

As always, can you paste the generated code? :)

@SparkQA
Copy link

SparkQA commented Feb 12, 2016

Test build #51156 has finished for PR 11177 at commit 22ceda9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

// In order to prevent code exploration, we can't call `consume()` many times, so we call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by "code exploration"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw any perf degradation from not unrolling the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop and copy two variables should only take about 1-2 nano second, should not have regressions.

But if we don't have loop here, then the generated code could be much easier to be larger than 8K, that could be regression (slower than without codegen).

@davies
Copy link
Contributor Author

davies commented Feb 12, 2016

The part of generated code for

sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
        .cube("k1", "k2").sum("id").collect()
/* 075 */       /* (input[0, bigint] % 1000) */
/* 076 */       boolean project_isNull1 = false;
/* 077 */       long project_value1 = -1L;
/* 078 */       if (false || 1000L == 0) {
/* 079 */         project_isNull1 = true;
/* 080 */       } else {
/* 081 */         if (false) {
/* 082 */           project_isNull1 = true;
/* 083 */         } else {
/* 084 */           project_value1 = (long)(range_value % 1000L);
/* 085 */         }
/* 086 */       }
/* 087 */       /* (input[0, bigint] & 256) */
/* 088 */       long project_value4 = -1L;
/* 089 */       project_value4 = range_value & 256L;
/* 090 */       /* (input[0, bigint] % 1000) */
/* 091 */       boolean project_isNull7 = false;
/* 092 */       long project_value7 = -1L;
/* 093 */       if (false || 1000L == 0) {
/* 094 */         project_isNull7 = true;
/* 095 */       } else {
/* 096 */         if (false) {
/* 097 */           project_isNull7 = true;
/* 098 */         } else {
/* 099 */           project_value7 = (long)(range_value % 1000L);
/* 100 */         }
/* 101 */       }
/* 102 */       /* (input[0, bigint] & 256) */
/* 103 */       long project_value10 = -1L;
/* 104 */       project_value10 = range_value & 256L;
/* 105 */
/* 106 */       boolean expand_isNull3 = true;
/* 107 */       long expand_value3 = -1L;
/* 108 */
/* 109 */       boolean expand_isNull4 = true;
/* 110 */       long expand_value4 = -1L;
/* 111 */
/* 112 */       boolean expand_isNull5 = true;
/* 113 */       int expand_value5 = -1;
/* 114 */       for (int expand_i = 0; expand_i < 4; expand_i ++) {
/* 115 */         switch (expand_i) {
/* 116 */         case 0:
/* 117 */           expand_isNull3 = project_isNull7;
/* 118 */           expand_value3 = project_value7;
/* 119 */
/* 120 */           expand_isNull4 = false;
/* 121 */           expand_value4 = project_value10;
/* 122 */
/* 123 */           expand_isNull5 = false;
/* 124 */           expand_value5 = 0;
/* 125 */           break;
/* 126 */
/* 127 */         case 1:
/* 128 */           expand_isNull3 = project_isNull7;
/* 129 */           expand_value3 = project_value7;
/* 130 */
/* 131 */           /* null */
/* 132 */           final long expand_value10 = -1L;
/* 133 */           expand_isNull4 = true;
/* 134 */           expand_value4 = expand_value10;
/* 135 */
/* 136 */           expand_isNull5 = false;
/* 137 */           expand_value5 = 1;
/* 138 */           break;
/* 139 */
/* 140 */         case 2:
/* 141 */           /* null */
/* 142 */           final long expand_value12 = -1L;
/* 143 */           expand_isNull3 = true;
/* 144 */           expand_value3 = expand_value12;
/* 145 */
/* 146 */           expand_isNull4 = false;
/* 147 */           expand_value4 = project_value10;
/* 148 */
/* 149 */           expand_isNull5 = false;
/* 150 */           expand_value5 = 2;
/* 151 */           break;
/* 152 */
/* 153 */         case 3:
/* 154 */           /* null */
/* 155 */           final long expand_value15 = -1L;
/* 156 */           expand_isNull3 = true;
/* 157 */           expand_value3 = expand_value15;
/* 158 */
/* 159 */           /* null */
/* 160 */           final long expand_value16 = -1L;
/* 161 */           expand_isNull4 = true;
/* 162 */           expand_value4 = expand_value16;
/* 163 */
/* 164 */           expand_isNull5 = false;
/* 165 */           expand_value5 = 3;
/* 166 */           break;
/* 167 */         }
/* 168 */         expand_metricValue.add(1);
/* 169 */
/* 170 */         // generate grouping key
/* 171 */         agg_rowWriter.zeroOutNullBytes();
/* 172 */
/* 173 */         if (expand_isNull3) {
/* 174 */           agg_rowWriter.setNullAt(0);
/* 175 */         } else {
/* 176 */           agg_rowWriter.write(0, expand_value3);
/* 177 */         }
/* 178 */
/* 179 */         if (expand_isNull4) {
/* 180 */           agg_rowWriter.setNullAt(1);
/* 181 */         } else {
/* 182 */           agg_rowWriter.write(1, expand_value4);
/* 183 */         }
/* 184 */
/* 185 */         if (expand_isNull5) {
/* 186 */           agg_rowWriter.setNullAt(2);
/* 187 */         } else {
/* 188 */           agg_rowWriter.write(2, expand_value5);
/* 189 */         }
/* 190 */         /* hash(input[0, bigint],input[1, bigint],input[2, int],42) */
/* 191 */         int agg_value3 = 42;
/* 192 */
/* 193 */         if (!expand_isNull3) {
/* 194 */           agg_value3 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(expand_value3, agg_value3);
/* 195 */         }
/* 196 */
/* 197 */         if (!expand_isNull4) {
/* 198 */           agg_value3 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(expand_value4, agg_value3);
/* 199 */         }
/* 200 */
/* 201 */         agg_value3 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashInt(expand_value5, agg_value3);
/* 202 */         UnsafeRow agg_aggBuffer = null;
/* 203 */         if (true) {
/* 204 */           // try to get the buffer from hash map
/* 205 */           agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value3);
/* 206 */         }
/* 207 */         if (agg_aggBuffer == null) {
/* 208 */           if (agg_sorter == null) {
/* 209 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 210 */           } else {
/* 211 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 212 */           }
/* 213 */
/* 214 */           // the hash map had be spilled, it should have enough memory now,
/* 215 */           // try  to allocate buffer again.
/* 216 */           agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value3);
/* 217 */           if (agg_aggBuffer == null) {
/* 218 */             // failed to allocate the first page
/* 219 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 220 */           }
/* 221 */         }

@davies
Copy link
Contributor Author

davies commented Feb 12, 2016

@rxin Had posted the generated code, add more comments.

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
// Some columns have the same expression in all the projections, so collect the unique
// expressions.
val columnUniqueExpressions: IndexedSeq[Set[Expression]] = output.indices.map { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this one, can we explain what the indexes are, and what the expressions are?

@SparkQA
Copy link

SparkQA commented Feb 12, 2016

Test build #51205 has finished for PR 11177 at commit e1fd87d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 13, 2016

Test build #51219 has finished for PR 11177 at commit ff2b5a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -17,11 +17,15 @@

package org.apache.spark.sql.execution

import scala.collection.immutable.IndexedSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer necessary. can you remove it in some other pr you have?

@rxin
Copy link
Contributor

rxin commented Feb 13, 2016

LGTM. Merging in master.

@asfgit asfgit closed this in 2228f07 Feb 13, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants