Author: shiyuhang
TiSpark is a thin layer built for running Apache Spark on top of TiKV/TiFlash to answer complex OLAP queries. It supports reading, writing, and deleting from TiKV/TiFlash with the guarantee of transactions.
In order to speed up reading, TiSpark will push some operators to TiKV or TiFlash. In this article, you will learn:
- What is pushdown in Spark
- How Spark implements pushdown
- TiSpark pushdown strategy
What is pushdown in Spark?
Pushdown is a classic SQL optimization that can speed up SQL queries. It pushes some operators closer to the data source as much as possible to reduce the data that the upper layer needs to process. For example, predicate pushdown will push the where condition, and aggregation pushdown will push the aggregate function.
So, what is pushdown in Spark? Does it have the same meaning as above?
First of all, let us see the internal of Spark SQL. The core of Spark SQL is the catalyst, it will:
Parse Spark SQL to the unresolved logical plan
Apply analysis rules and catalog to get the resolved logical plan
Apply optimization rules to get the optimized logical plan
Use planning strategies to get the physical plan
Select one of the physical plans based on the cost model
Generate the executable RDDs and assign them to Spark Core
In the process of catalyst, Spark SQL will be parsed as a tree. The node of the tree is called TreeNode. Spark has several classes that inherit TreeNode to represent different types of nodes in the logical plan and physical plan. For example, where condition will be parsed as filter
node.
Now, let 's talk about the pushdown in Spark. There are two steps of pushdown in Spark:
Pushdown Optimization: first, Spark will push the operators closer to the data source when it optimizes the logical plan.
Pushdown to data source: then Spark will push the operators to the data source when it builds the physical plan.
As an example, consider the following SQL:
select * from A join B on A.id = B.id where A.a>10 and B.b<100;
This SQL will be parsed as a tree in Spark. The filter is where condition, join is the join operator and scan is the data source(here it represents the table A and table B)
After the first step of pushdown, the filter will be closer to the data source to reduce the data which will be processed by join.
Then, the filter may be pushed to data source when building the physical plan. That is to say, Spark need not filter the data anymore.
How Spark implements pushdown
In the last section, you have learned that there are two steps pushdown in Spark. The first will optimize Spark SQL and the second will push to the data source.
In this section, you will learn how Spark implements the two pushdowns. The code in this section is based on Spark 3.2.
pushdown optimization
pushdown optimization will be applied to the logical plan in the optimizer phase of the catalyst. Take the predicate pushdown as an example.
The rule of predicate pushdown is called PushDownPredicates
in Spark.
object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
CombineFilters.applyLocally
.orElse(PushPredicateThroughNonJoin.applyLocally)
.orElse(PushPredicateThroughJoin.applyLocally)
}
}
The rule will be recursively applied to the tree with the transform
.
For every TreeNode in the plan:
CombineFilters will be used to combine where condition. For example, a > 1 and a >2 will be combined to a >2
PushPredicateThroughNonJoin will handle the predicate without join
PushPredicateThroughJoin will handle the predicate with join
You can refer to the Spark source code to see the details of PushPredicateThroughNonJoin and PushPredicateThroughJoin.
Pushdown to data source
Spark may pushdown to the data source when building the physical plan based on the results of the pushdown optimization.
Whether to pushdown depends on the ability and implementation of the data source. If your data source does not support pushdowns, then you need to tell Spark does not do that.
Thus, Spark provides some interface for data sources to communicate with Spark. We also take predicate pushdown as an example. Spark provides the following interface for it:
@Evolving
public interface SupportsPushDownFilters extends ScanBuilder {
Filter[] pushFilters(Filter[] filters);
Filter[] pushedFilters();
}
Filter[] pushFilters(Filter[] filters):the input is the result of the pushdown optimization and the output is the filters that can't be pushed to data source, which are called
postScanFilters
in Spark.Filter[] pushedFilters():the input is empty and the output is the filters that can be pushed to data source which are called
pushedFilters
.A filter can be both the
postScanFilters
and thepushedFilters
. In this case, the data source and Spark will perform the filter together.
As a maintainer of Spark Data source connector, we can easily control the pushdown with the interface. But, how Spark applies the pushdown rules? There are two steps:
Keep the
postScanFilters
by the implements of the interface. Spark will handle them later.Handle the
pushedFilters
with the scan operator. How it is handled depends on the implementation of the data source.
Keep the postScanFilters
The first step occurs in the optimization phase of the catalyst by V2ScanRelationPushDown
. The core code is as follows (simplified):
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = {
applyColumnPruning(pushDownAggregates(pushDownFilters(createScanBuilder(plan))))
}
private def pushDownFilters(plan: LogicalPlan) = plan.transform {
case Filter(condition, sHolder: ScanBuilderHolder) =>
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
sHolder.builder, normalizedFiltersWithoutSubquery)
val filterCondition = postScanFilters.reduceLeftOption(And)
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
All of the pushdown rules are applied in the apply method, including pushDownFilters
which is responsible for predicate pushdown.
pushDownFilters will get postScanFilters
and pushedFilters
by PushDownUtils and only return postScanFilters for the final logical plan. Spark will do the filters later.
The PushDownUtils code is as follows:
object PushDownUtils extends PredicateHelper {
def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq)
case _ => (Nil, filters)
}
}
}
PushDownUtils will match the SupportsPushDownFilters and get pushedFilters and postScanFilters by the implementation of the data source. pushedFilters will be empty once the implementation is empty, which means no predicate will be pushdown.
Handle the pushedFilters
Let us take JDBC data source as an example:
The build method in JDBCScanBuilder
will return JDBCScan with the input pushedFilter
.
override def build(): Scan = {
JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedFilter,
pushedAggregateList, pushedGroupByCols)
}
JDBCScan
will call relation.buildScan
in the toV1TableScan
method and return JDBCRDD
case class JDBCScan(
relation: JDBCRelation,
prunedSchema: StructType,
pushedFilters: Array[Filter],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]]) extends V1Scan {
override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedFilters, groupByColumns)
}
}.asInstanceOf[T]
}
}
Filter will be parsed as where condition and saved into filterWhereClause in JDBCRDD
. Then a complete SQL with where condition will request the data sources which are compatible with the MySQL protocol.
private[jdbc] class JDBCRDD(
sc: SparkContext,
getConnection: () => Connection,
schema: StructType,
columns: Array[String],
filters: Array[Filter],
partitions: Array[Partition],
url: String,
options: JDBCOptions,
groupByColumns: Option[Array[String]])
extends RDD[InternalRow](sc, Nil) {
private val filterWhereClause: String =
filters
.flatMap(JDBCRDD.compileFilter(_, JdbcDialects.get(url)))
.map(p => s"($p)").mkString(" AND ")
}
Then the implementation of JDBC data source will be applied to Spark in the planner phase by DataSourceV2Strategy. The simplified core code is as follows:
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
DataSourceV2ScanRelation(_, V1ScanWrapper(scan, pushed, aggregate), output)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
aggregate,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false)
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(_, scan: LocalScan, output))
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation)
case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
Spark will match PhysicalOperation in the strategy. As for the JDBC data source, it will go into the first case because it is a V1Scan.
In the first case, scan.toV1TableScan
is called to get the JDBCRDD which is introduced above. Then RowDataSourceScanExec
will perform the data fetch with the JDBCRDD. Finally, withProjectAndFilter
will put RowDataSourceScanExec
into the entire physical plan. Spark will first execute the scan with filter and then execute the other operators.
TiSpark pushdown strategy
The strategy in TiSpark
TiSpark is a connector of Spark that provides TiKV data source. So, we can push some operators from Spark to TiKV according to the discussion above.
The question is: what is the strategy of pushdown in TiSpark?
We need to support pushdown in Spark >= 2.4 and meet the following strategies.
First, an operator can not be pushed down when the data source can not handle the operator. For predicate, TiKV can not support every expression and data type. So, TiSpark needs to exclude them automatically.
Second, we may not want to pushdown to reduce the pressure of TiKV. It will help when your Spark resources are abundant and TiKV resources are scarce. TiSpark provides some configs for this:
spark.tispark.plan.allow_agg_pushdown: you can refuse the aggregation pushdown by this config.
spark.tispark.plan.unsupported_pushdown_exprs: you can specify unsupported expressions to refuse them to pushdown. It will also help you work with the old version of TiKV which may not support some of the expressions.
How TiSpark implements the strategy
Next, we will learn how TiSpark implements the strategy.
We have introduced the pushdown interface provided by Spark. However, it can not meet the strategy of TiSpark. Here are some questions about the pushdown interface:
Poor expansion ability: The design of the pushdown interface is not good enough to expand in Data Source API V1 (DSV1). This means it is difficult to support various pushdowns in DSV1.
The limited ability of pushdown: DSV2 improves the pushdown interface and solves the expansion issues. But the ability of pushdown is limited. Spark 3.0 only supports predicate pushdown and column prune pushdown. Spark 3.1 add the aggregation pushdown and Spark 3.2 add the limit pushdown.
Inflexible pushdown strategy: For example, aggregation pushdown does not support push avg which can be converted as sum/count in DSV2
TiSpark needs support common pushdowns in every supported spark version. Thus, the pushdown interface is not suitable. What to do next? The answer lies in catalyst extension.
The catalyst extension is supported after Spark 2.2. We can inject custom rules and strategies into most of the phases of the catalyst. In other words, we can inject TiSpark pushdown strategies to control the pushdown precisely.
Spark will pushdown to the data source in the planner phase. The corresponding extension interface is injectPlannerStrategy(based on Spark 3.2.1 and TiSpark 3.0.1)
def injectPlannerStrategy(builder: StrategyBuilder): Unit = {
plannerStrategyBuilders += builder
}
TiSpark needs to implement the interface:
ReflectionUtil will return the TiStrategy
according to the different spark versions by reflection in scala. This can avoid compatibility issues caused by different spark versions.
e.injectPlannerStrategy(new TiStrategyFactory(getOrCreateTiContext))
class TiStrategyFactory(getOrCreateTiContext: SparkSession => TiContext)
extends (SparkSession => Strategy) {
override def apply(sparkSession: SparkSession): Strategy = {
TiExtensions.validateCatalog(sparkSession)
ReflectionUtil.newTiStrategy(getOrCreateTiContext, sparkSession)
}
}
TiStrategy is the core of pushdowns. It will match the TiDBtable which represents the TiDB data source and then execute thedoPlan
. If the match fails, TiSpark will do nothing to avoid affecting other data sources.
ase class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSession: SparkSession)
extends Strategy with Logging {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan
.collectFirst {
case DataSourceV2ScanRelation(DataSourceV2Relation(table: TiDBTable, _, _, _, _), _, _) =>
doPlan(table, plan)
}
.toSeq
.flatten
}
The strategies in doPlan imitate the strategies in DataSourceV2Strategy. TiSpark will identify the operators which can be pushed to the data source with pattern match. Then request TiKV based on these operators. Let us take predicate pushdown as an example. TiSpark will match the PhysicalOperation and execute the pruneFilterProject method.
private def doPlan(source: TiDBTable, plan: LogicalPlan): Seq[SparkPlan] =
plan match {
case PhysicalOperation(
projectList,
filters,
DataSourceV2ScanRelation(
DataSourceV2Relation(source: TiDBTable, _, _, _, _),
_,
_)) =>
pruneFilterProject(projectList, filters, source, newTiDAGRequest()) :: Nil
case TiAggregation(
groupingExpressions,
aggregateExpressions,
resultExpressions,
TiAggregationProjectionV2(filters, _, `source`, projects))
if isValidAggregates(groupingExpressions, aggregateExpressions, filters, source) =>
case _ => Nil
}
The pruneFilterProject method will:
Convert Spark Filter expression to TiKV Filter expression with TiExprUtils.isSupportedFilter. TiSpark will also judge if the expression can be pushed in the method. The Filters that can be pushed will be put into
pushdownFilters
, and those that cannot be pushed will be put into theresidualFilter
The DAGRequest, which is the parameter in the request to TiKV will be built by filterToDAGRequest. The
pushdownFilters
will be put into the DAGRequest. Then, A scan that can get TiKV data will be generated bytoCoprocessorRDD
The scan will be wrapped and executed by FilterExec. Meanwhile, we need to apply the
residualFilter
back to the Spark.
private def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
source: TiDBTable,
dagRequest: TiDAGRequest): SparkPlan = {
val (pushdownFilters: Seq[Expression], residualFilters: Seq[Expression]) =
filterPredicates.partition((expression: Expression) =>
TiExprUtils.isSupportedFilter(expression, source, blocklist))
val residualFilter: Option[Expression] =
residualFilters.reduceLeftOption(catalyst.expressions.And)
filterToDAGRequest(tiColumns, pushdownFilters, source, dagRequest)
val scan = toCoprocessorRDD(source, projectSeq, dagRequest)
residualFilter.fold(scan)(FilterExec(_, scan))
}
In this way, TiSpark can support most of the pushdown in every spark version (>=2.4).
So far, TiSpark supports predicate pushdown, aggregation pushdown, limit pushdown, order by pushdown, and projection pushdown. And TiSpark can control whether a specific expression or data type can be pushed down.
Conclusion
TiSpark support pushdown by catalyst extension which brings several problems:
The increase in code complexity
Unstable for we may touch the evolving interface or method in Spark
We need to be very careful to avoid affecting the original Spark logical
Spark is focusing on the development of DataSource API as well as the pushdown interface. Hope that in the near future DataSource API will be strong enough to meet the needs of TiSpark. At that time, we will be happy to transfer pushdown to DataSource API.
Appendix
The support pushdown in TiSpark is as follows
Data Type | sum | count | avg | min | max | predicate & order by & group by |
BIT | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
BOOLEAN | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
TINYINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
SMALLINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
MEDIUMINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
INTEGER | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
BIGINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
FLOAT | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
DOUBLE | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
DECIMAL | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
DATE | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
DATETIME | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TIMESTAMP | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TIME | ❌ | ✅ | ❌ | ? | ? | ✅ |
YEAR | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
CHAR | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
VARCHAR | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TINYTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
MEDIUMTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
LONGTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
BINARY | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
VARBINARY | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TINYBLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
BLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
MEDIUMBLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
LONGBLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
ENUM | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
SET | ❌ | ❌ | ❌ | ? | ? | ❌ |
- Pushdown min/max(time) leads to a wrong result
- Pushdown min/max(set) may cause TiKV panic
You can judge if an operator is pushdown by explain
in TiSpark. Here is an example:
1.Create a table in TiDB
CREATE TABLE `test`.`t` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
);
2.Execute Spark SQL with explan
spark.sql("select avg(id) from test.t where id > 10").explain
3.You will get the execute plan
*(2) HashAggregate(keys=[], functions=[specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), specialsum(count(id#252L)#259L, LongType, 0)])
+- Exchange SinglePartition, true, [id=#38]
+- *(1) HashAggregate(keys=[], functions=[partial_specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), partial_specialsum(count(id#252L)#259L, LongType, 0)])
+- *(1) ColumnarToRow
+- TiKV CoprocessorRDD{[table: t] IndexReader, Columns: id@LONG: { IndexRangeScan(Index:primary(id)): { RangeFilter: [[id@LONG GREATER_THAN 10]], Range: [([t\200\000\000\000\000\000\000o_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\v], [t\200\000\000\000\000\000\000o_i\200\000\000\000\000\000\000\001\372])] }, Aggregates: Sum(id@LONG), Count(id@LONG) }, startTs: 434873744501506049}
Focus on TiKV CoprocessorRDD
RangeFilter: [[id@LONG GREATER_THAN 10]]
: indicates that id>10 is pushed downAggregates: Sum(id@LONG), Count(id@LONG)
: indicates that Sum and Count are pushed down, they will be rewritten into avg in Spark.
Top comments (0)