Integration with Datafusion
optd is currently used as a physical optimizer for Apache Arrow Datafusion. To interact with Datafusion, you may use the following command to start the Datafusion cli.
cargo run bin datafusionoptdcli
cargo run bin datafusionoptdcli  f tpch/test.sql # run TPCH queries
optd is designed as a flexible optimizer framework that can be used in any database systems. The core of optd is in optdcore
, which contains the Cascades optimizer implementation and the definition of key structures in the optimization process. Users can implement the interfaces and use optd in their own database systems by using the optdcore
crate.
The optd Datafusion representation contains Datafusion plan nodes, SQL expressions, optimizer rules, properties, and cost models, as in the optddatafusionrepr
crate.
The optddatafusionbridge
crate contains necessary code to convert Datafusion logical plans into optd Datafusion representation and convert optd Datafusion representation back into Datafusion physical plans. It implements the QueryPlanner
trait so that it can be easily integrated into Datafusion.
Plan Nodes
This is an incomplete list of all Datafusion plan nodes and their representations that we have implemented in the system.
Join(type) left:PlanNode right:PlanNode cond:Expr
Projection expr_list:ExprList
Agg child:PlanNode expr_list:ExprList groups:ExprList
Scan table:String
ExprList ...children:Expr
Sort child:PlanNode sort_exprs:ExprList < requiring SortExprs
... and others
Note that only ExprList
or List
can have variable number of children. All plan nodes only have a fixed number of children. For projections and aggregations where users will need to provide a list of expressions, they will have List
node as their direct child.
Developers can use the define_plan_node
macro to add new plan nodes into the optddatafusionrepr.
#![allow(unused)] fn main() { #[derive(Clone, Debug)] pub struct LogicalJoin(pub PlanNode); define_plan_node!( LogicalJoin : PlanNode, Join, [ { 0, left: PlanNode }, { 1, right: PlanNode } ], [ { 2, cond: Expr } ], { join_type: JoinType } ); }
Developers will also need to add the plan node type into the OptRelNodeTyp
enum, implement is_plan_node
and is_expression
for them, and implement the explain format in explain
.
Expressions
SQL Expressions are also a kind of RelNode
. We have binary expressions, function calls, etc. in the representation.
Notably, we convert all column references into column indexes in the Datafusion bridge. For example, if Datafusion yields a logical plan of:
LogicalJoin { a = b }
Scan t1 [a, v1, v2]
Scan t2 [b, v3, v4]
It will be converted to:
LogicalJoin { #0 = #3 }
Scan t1
Scan t2
in the optd representation.
For SQL expressions, the optd Datafusion representation does not do costbased searches on expressions, though this is supported in optdcore. Each SQL expression can only have one binding in the current implementation.
Explain
We use risinglightdb's prettyxmlish crate and implement a custom explain format for Datafusion plan nodes.
#![allow(unused)] fn main() { PhysicalProjection { exprs: [ #0 ] } └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalProjection { exprs: [ #0 ] } │ └── PhysicalScan { table: t1 } └── PhysicalProjection { exprs: [ #0 ] } └── PhysicalScan { table: t2 } }
This is different from the default Lisprepresentation of the RelNode
.
Rules
Currently, we have a few rules that pulls filters and projections up and down through joins. Also, we have join assoc and join commute rules to reorder the joins.
Properties
We have the Schema
property that will be used in the optimizer rules to determine number of columns of each plan nodes so that we can rewrite column reference expressions correctly.
Cost Model
We have a simple cost model that computes I/O cost and compute cost based on number of rows of the children plan nodes.
Cardinality Estimation
As per Leis 2015, we define cardinality estimation to be a separate component from the cost model. Statistics are considered a part of the cardinality estimation component. The internal name for our cardinality estimation component is Gungnir™. Gungnir is the mythical spear wielded by Odin which adaptively changes course midair so as to never miss its mark. It represents both the accuracy and adaptivity of our cardinality estimation subsystem.
Our base cardinality estimation scheme is inspired by Postgres. We utilize roughly the same four percolumn statistics as Postgres: the most common values of that column, the # of distinct values of that column, the fraction of nulls of that column, and a distribution of values for that column. Our base predicate (filter or join) selectivity formulas are also the same as Postgres. This is as opposed to Microsoft SQLServer, for instance, which utilizes very different percolumn statistics and predicate selectivity formulas. Our statistics are not exactly the same as Postgres though. For one, while Postgres uses a simple equiheight histogram, we utilize the more advanced TDigest data structure to model the distribution of values. Additionally, Postgres samples its tables to build its statistics whereas we do a full sequential scan of all tables. This full sequential scan is made efficient by the fact that we use sketches, which have a low time complexity, and we implemented our sketching algorithms to be easily parallelizable.
Statistics
We obtain our statistics with high parallelism and a minimal memory footprint, using probabilistic algorithms that trade off accuracy for scalability. Specifically:

The distribution of each column (i.e., CDF) is computed using the TDigest algorithm designed by Ted Dunning et al., rather than traditional equiwidth histograms. TDigests can be seen as dynamically resizable histograms that offer particular precision at the tails of the distribution.

The value of NDistinct is calculated using Philippe Flajolet et al. HyperLogLog algorithm. Similarly, it estimates the unique number of values within a column by examining the pattern of the most unique element. This approach is memorybounded and doesn't require a hashset, unlike traditional methods.

(TODO @AlSchlo: Whatever we use for MCV)
All of these techniques are embarrassingly parallel tasks by design: threads first scan a partition of the data, maintain a memorybounded structure (with higher memory leading to higher accuracy), and then merge their individual results into the final column statistics.
(TODO @CostModelTeam: explain adaptivity once we've implemented it)