Optimizer
optd implements the Columbia Cascades framework based on Yongwen Xu's master's thesis. The implementation can be found at optd-core/src/cascades
.
In optd, the plan nodes are represented as a tree or DAG using the RelNode
structure (we will explain this in Plan Representation). For example, a join-filter-projection plan will be represented as:
The key idea of Cascades optimizer is to have a memo table that memorizes some information, so the the search process will not do the same search task twice. Therefore, inside the Cascades optimizer, the plan will be stored using RelMemoNode
instead, where the children of a plan node will be stored as an integer id instead of the concrete plan nodes.
After storing the user-provided plan into the memo table using the memo representation, the optimizer will start the optimization process by invoking the Columbia Cascades tasks. The optimizer maintains ongoing tasks as a stack. Each task can invoke other tasks. The optimization process stops when all the stack becomes empty, or when it reaches the exploration budget. The task invocation graph is as below, as in the Columbia paper.
Here, Group
corresponds to the MExpr
in the Cascades paper, and Expr
is simply Expr
in the Cascades paper. An expression can be a plan node (i.e., join, projection) or an expression (as in projection, or filter condition). The task implementations can be found in optd-core/src/cascades/tasks
. The apply rule task will invoke user-defined rules and produce new expressions in the group. The implementation of the rule engine can be found in Rule Engine. For example, after invoking the join reordering rule and the physical transformation rule, we will have the following expressions in the memo table group.
Note that we store physical expressions (i.e., NLJ/HashJoin) and logical expressions (i.e., Join) in the same group.
One special thing about the optd Cascades optimizer is that it persists states across runs. The states include: whether a rule is invoked or not, all the elements in the memo table, etc. The persistence property makes it possible for the optimizer to associate runtime information with the plan nodes, and use these runtime information in subsequent optimizations. You can learn more about adaptive optimization in optd in Re-optimization and Partial Exploration.
optd also provides a heuristics optimizer engine for testing rule implementation. It can be found at optd-core/src/heuristics
.
Plan Representation
optd uses serialized representation of expressions internally. This makes it super easy to serialize the plans and handle the plans in a universal way. All plan nodes and SQL expressions are stored as RelNode
. In contrast, systems like Apache Calcite use a object-oriented approach to define plan nodes.
the optd representation -- one universal structure for all things
#![allow(unused)] fn main() { pub struct RelNode<Typ> { pub typ: Typ, pub children: Vec<RelNodeRef>, pub data: Option<Value>, } pub struct Join(RelNode); impl Join { pub fn left(&self) -> PlanNode { PlanNode(self.0.child(0)) } pub fn right(&self) -> PlanNode { PlanNode(self.0.child(1)) } pub fn cond(&self) -> Expr { Expr(self.0.child(2)) } pub fn join_type(&self) -> JoinType { let Typ::Join(typ) = self.0.typ { typ } else { unreachable!() } } } }
the Calcite representation -- object-oriented
#![allow(unused)] fn main() { pub struct Join { pub left: RelNode<Typ>, pub right: RelNode<Typ>, pub cond: Expression, pub join_type: JoinType, } }
The optd representation uses a universal type RelNode
for all expressions, and therefore the optimizer framework can know everything inside the plan when processing. However, for the object-oriented representation, the user will likely need to implement some boilerplate functions like replace_children
that facilitates the optimizer framework in the optimization process.
The downside of the optd representation is that more efforts are needed when getting things out of the plan node. For example, if the user wants to get the join condition, it will first extract the third child from the plan node, verify whether it is an expression by looking at the typ
field, and then wrap it with the Expr
class. Fortunately, optd provides a lot of macros to generate this kind of code, which can be found in the optd Datafusion representation.
And because optd chooses to use a single representation for both plan nodes and SQL expressions, the actual representation of a join node contains a join condition as a direct child of the RelNode
structure.
optd by default supports a Lisp-style display implementation for plan nodes. The above join plan will be printed as:
(Join(Inner) (Scan a) (Scan b) (= a.x b.x))
And users can implement other explain formats for plan nodes. For example, the optd Datafusion plan node representation implements the tree-style explain format for all plan nodes.
We still have not explained the typ
field. It is a user-defined enum that contains all plan nodes in the system. For example, the OptRelNodeTyp
enum, which is the Datafusion representation, contains logical plan nodes like Projection
, Join
; physical plan nodes like NestedLoopJoin
; and expressions like BinOp
. Besides, optd has one constraint on the enum: it should always contain a Placeholder(usize)
variant and a List
variant.
List
is a special type that may contain variable number of children. It is the only plan node type that is allowed to have non-constant number of children. It is used in the representation of Projection
, Aggregation
, etc.
Placeholder(usize)
is a special type that is only used in the apply rule process. Usually, the usize
stored in the placeholder is a group id. We will explain this in Rule Engine.
Rules and Rule Engine
optd has a rule match engine based on Rust macros so that it is easier for developers to programmatically define a rule.
Define a Rule Matcher
optd developers can define a rule by either implementing the Rule
trait or use the helper macros. Both examples can be found under optd-datafusion-repr/src/rules
.
#![allow(unused)] fn main() { // A join B -> B join A define_rule!( JoinCommuteRule, apply_join_commute, (Join(JoinType::Inner), left, right, [cond]) ); }
Developers can use the define_rule!
macro to define a rule by providing a rule matcher and a transformation function. As in the above JoinCommuteRule
example, the rule matcher is (Join(JoinType::Inner), left, right, [cond])
, which is a Lisp representation of the rule.
The join plan node in the optd Datafusion representation is: the typ
field is Join(JoinType)
, and the children are left child, right child, and the join condition. Therefore, this rule matches the inner join node, and retrieves left child, right child, and cond back.
#![allow(unused)] fn main() { // (A join B) join C -> A join (B join C) define_rule!( JoinAssocRule, apply_join_assoc, ( Join(JoinType::Inner), (Join(JoinType::Inner), a, b, [cond1]), c, [cond2] ) ); }
The above example is the join assoc rule. It matches a left-deep join tree. The following figure is the visual representation of the two matchers:
Note that variables like a
, b
, c
will only match the group ID of a child, while [cond1]
will expand the group IDs and retrieve the concrete expressions. [xxx]
is useful when matching the SQL expression child.
The matcher API also supports get all children of a plan node, which is useful on processing projection expressions, but this is not supported in the matcher macro for now.
Implement the Transformation
Once the optimizer matches a plan node in the plan space, it will invoke the apply_join_commute
transformation function for the JoinCommute
rule, or the apply_join_assoc
function for the JoinAssoc
rule. The transformation function takes two parameters: the optimizer instance, and the matched structure, where the structure is auto-generated by the macro containing all match variables defined by the user. For example,
#![allow(unused)] fn main() { fn apply_join_assoc( optimizer: &impl Optimizer<OptRelNodeTyp>, JoinAssocRulePicks { a: RelNode<OptRelNodeTyp>, b: RelNode<OptRelNodeTyp>, c: RelNode<OptRelNodeTyp>, cond1: RelNode<OptRelNodeTyp>, cond2: RelNode<OptRelNodeTyp>, }: JoinAssocRulePicks, ) -> Vec<RelNode<OptRelNodeTyp>> { // do some processing and return the transformed plan node } }
Users can expect a
, b
, c
to be a OptRelNodeTyp::Placeholder
type as the optimizer will only return the group ID information, while cond1
and cond2
are concrete SQL expression trees.
Generate Bindings
One crucial step in the Cascades apply rule step is to generate bindings for a rule. From the optimizer's perspective, it will only see RelMemoNode
during the search process, which only contains the current node type and the children group IDs. It will need to recursively match the children so as to provide the rule transformation function a structure to process. For example, let us go through the example of applying the join assoc rule in the plan space.
As in the above figure, we first discover that two expressions in group 1 match the top-most node in the matcher. Therefore, it iterates all expressions and explore the child groups and matches the expressions in the child group with the child node in the matcher. Group 2 has two expressions that match the left side of the matcher, and group 3 also has two expressions satisfying the matcher requirement. Therefore, we have 4 bindings for this matcher in the plan space, and the rule transformation function will be invoked for 4 times for each of the bindings.
Currently, optd generates all bindings at once and invoke the rule transformation function for each of the bindings. This could be improved in the future that we have a BindingsIterator
that generates one binding at a time.
Rule Engine
IR Representation
The rule engine matches the plan space based on a rule matcher IR defined in optd-core/src/rules/ir.rs
. Currently, the IR contains 6 primitives that define the match pattern:
#![allow(unused)] fn main() { pub enum RuleMatcher<T: RelNodeTyp> { /// Match a node of type `typ`. MatchAndPickNode { typ: T, children: Vec<Self>, pick_to: usize, }, /// Match a node of type `typ`. MatchNode { typ: T, children: Vec<Self> }, /// Match anything, PickOne { pick_to: usize, expand: bool }, /// Match all things in the group PickMany { pick_to: usize }, /// Ignore one IgnoreOne, /// Ignore many IgnoreMany, } }
Specifically, PickOne
and PickMany
contain a pick_to
field, which is an integer ID. The rule engine takes a matcher, matches the pattern, and return a HashMap<usize, RelNode>
mapping, where the hash map key is the pick_to
ID that the user provides. The integer ID must be unique in one matcher definition.
Rule Definition Macro
The rule definition macro is a helper interface over the rule IR. It automatically generates the structure to store a matched pattern, generates the pick_to
ID for each of the user-requested expression, and copies the content of the HashMap
returned from the rule engine to the matched pattern structure.
For example, the macro for the join commute rule will be expanded into the following code:
#![allow(unused)] fn main() { impl JoinCommuteRule { pub fn new() -> Self { let mut pick_num = 0; let matcher = RuleMatcher::MatchNode { typ: Join(JoinType::Inner), children: vec![ Box::new([ RuleMatcher::PickOne { pick_to: { let x = pick_num; pick_num += 1; x }, expand: false, }, // other picks ]), ], }; Self { matcher } } } }
In new
, the macro generates a matcher and maintains the counter for each of the pick_to
field.
#![allow(unused)] fn main() { pub struct JoinCommuteRulePicks { pub left: RelNode<OptRelNodeTyp>, pub right: RelNode<OptRelNodeTyp>, pub cond: RelNode<OptRelNodeTyp>, } }
It then generates a structure to be passed to the rule transformation function with the user-defined variable names for each of the element in the match pattern.
In the Rule
trait implementation, the macro generates the code to unpack elements from the hash map into the match pattern structure.
#![allow(unused)] fn main() { impl<O: Optimizer<OptRelNodeTyp>> Rule<OptRelNodeTyp, O> for JoinCommuteRule { fn apply( &self, optimizer: &O, mut input: HashMap<usize, RelNode<OptRelNodeTyp>>, ) -> Vec<RelNode<OptRelNodeTyp>> { let left: RelNode<OptRelNodeTyp>; let right: RelNode<OptRelNodeTyp>; let cond: RelNode<OptRelNodeTyp>; let mut pick_num = 0; { left = input.remove(&pick_num).unwrap(); pick_num += 1; }; // ... let res = JoinCommuteRulePicks { left, right, cond }; apply_join_commute(optimizer, res) } } }
Rule Mode
Currently, cascades can support two types of modes for logical rules: heuristics and cascades. In other words, with the support of two different types of modes, cascades can work as a hybrid optimizer.
Heuristics
One can register a heuristics rule by calling RuleWrapper::new_heuristic(Arc::new(rule)
. It adds the rule wrapper into the optimizer's rules queue. The heuristic rules are applied bottom up by default.
With the rule wrapper, one can register any rules in heuristic mode and cascade mode. A rule register in heuristic mode will replace the input expressions by the generated expressions, therefore, shrink the search space and improve search efficiencies. Rules registered as heuristic mode should be the rules that always get a better expression when being applied, for example, select * from t1 join t2 on false
can be replaced by an empty relation.
A constraint for the rules registered as heuristics is that it can only return one expression when it finds the input matched with its pattern and can generate a new expression, and return zero expression when it cannot generate new expressions.
Cascades
One can register a heuristics rule by calling RuleWrapper::new_cascades(Arc::new(rule)
. It adds the rule wrapper into the optimizer's rules queue in cascades mode.
Rules registered as cascades mode are the rules that depend on cost models to figure out which one is better, for example, select * from t1 join t2 on t1.a = t2.a
can be transformed to select * from t2 join t1 on t1.a = t2.a
. Rules registered as cascades modes can return multiple expressions once matched.
Cost Model
Developers can plug their own cost models into optd. The cost must be represented as a vector of f64
s, where the first element in the vector is the weighted cost. The optimizer will use weighted cost internally for cost comparison and select the winner for a group.
The cost model interface can be found in optd-core/src/cost.rs
, and the core of the cost model is the cost computation process implemented in CostModel::compute_cost
.
#![allow(unused)] fn main() { pub trait CostModel<T: RelNodeTyp>: 'static + Send + Sync { fn compute_cost( &self, node: &T, data: &Option<Value>, children: &[Cost], context: RelNodeContext, ) -> Cost; } }
compute_cost
takes the cost of the children, the current plan node information, and some contexts of the current node. The context will be useful for adaptive optimization, and it contains the group ID and the expression ID of the current plan node, so that the adaptive cost model can use runtime information from the last run to compute the cost.
The optd Datafusion cost model stores 4 elements in the cost vector: weighted cost, row count, compute cost and I/O cost. The cost of the plan nodes and the SQL expressions can all be computed solely based on these information.
Contrary to other optimizer frameworks like Calcite, optd does not choose to implement the cost model as part of the plan node member functions. In optd, developers write all cost computation things in one file, so that testing and debugging the cost model all happens in one file (or in one impl
).
Properties
In optd, properties are defined by implementing the PropertyBuilder
trait in optd-core/src/property.rs
. Properties will be automatically inferred when plan nodes are added to the memo table. When initializing an optimizer instance, developers will need to provide a vector of properties the optimizer will need to compute throughout the optimization process.
Define a Property
Currently, optd only supports logical properties. It cannot optimize a query plan with required physical properties for now. An example of property definition is the Datafusion representation's plan node schema, as in optd-datafusion-repr/src/properties/schema.rs
.
#![allow(unused)] fn main() { impl PropertyBuilder<OptRelNodeTyp> for SchemaPropertyBuilder { type Prop = Schema; fn derive( &self, typ: OptRelNodeTyp, data: Option<optd_core::rel_node::Value>, children: &[&Self::Prop], ) -> Self::Prop { match typ { OptRelNodeTyp::Scan => { let name = data.unwrap().as_str().to_string(); self.catalog.get(&name) } // ... }
The schema property builder implements the derive
function, which takes the plan node type, plan node data, and the children properties, in order to infer the property of the current plan node. The schema property is stored as a vector of data types in Schema
structure. In optd, property will be type-erased and stored as Box<dyn Any>
along with each RelNode
group in the memo table. On the developer side, it does not need to handle all the type-erasing things and will work with typed APIs.
Use a Property
When initializing an optimizer instance, developers will need to provide a vector of property builders to be computed. The property can then be retrieved using the index in the vector and the property builder type. For example, some optimizer rules will need to know the number of columns of a plan node before rewriting an expression.
For example, the current Datafusion optd optimizer is initialized with:
#![allow(unused)] fn main() { CascadesOptimizer::new_with_prop( rules, Box::new(cost_model), vec![Box::new(SchemaPropertyBuilder::new(catalog))], // .. ), }
Therefore, developers can use index 0 and SchemaPropertyBuilder
to retrieve the schema of a plan node after adding the node into the optimizer memo table.
#![allow(unused)] fn main() { impl PlanNode { pub fn schema(&self, optimizer: CascadesOptimizer<OptRelNodeTyp>) -> Schema { let group_id = optimizer.resolve_group_id(self.0.clone()); optimizer.get_property_by_group::<SchemaPropertyBuilder>(group_id, 0 /* property ID */) } } }
Integration with Datafusion
optd is currently used as a physical optimizer for Apache Arrow Datafusion. To interact with Datafusion, you may use the following command to start the Datafusion cli.
cargo run --bin datafusion-optd-cli
cargo run --bin datafusion-optd-cli -- -f datafusion-optd-cli/tpch-sf0_01/test.sql # run TPC-H queries
optd is designed as a flexible optimizer framework that can be used in any database systems. The core of optd is in optd-core
, which contains the Cascades optimizer implementation and the definition of key structures in the optimization process. Users can implement the interfaces and use optd in their own database systems by using the optd-core
crate.
The optd Datafusion representation contains Datafusion plan nodes, SQL expressions, optimizer rules, properties, and cost models, as in the optd-datafusion-repr
crate.
The optd-datafusion-bridge
crate contains necessary code to convert Datafusion logical plans into optd Datafusion representation and convert optd Datafusion representation back into Datafusion physical plans. It implements the QueryPlanner
trait so that it can be easily integrated into Datafusion.
Plan Nodes
This is an incomplete list of all Datafusion plan nodes and their representations that we have implemented in the system.
Join(type) left:PlanNode right:PlanNode cond:Expr
Projection expr_list:ExprList
Agg child:PlanNode expr_list:ExprList groups:ExprList
Scan table:String
ExprList ...children:Expr
Sort child:PlanNode sort_exprs:ExprList <- requiring SortExprs
... and others
Note that only ExprList
or List
can have variable number of children. All plan nodes only have a fixed number of children. For projections and aggregations where users will need to provide a list of expressions, they will have List
node as their direct child.
Developers can use the define_plan_node
macro to add new plan nodes into the optd-datafusion-repr.
#![allow(unused)] fn main() { #[derive(Clone, Debug)] pub struct LogicalJoin(pub PlanNode); define_plan_node!( LogicalJoin : PlanNode, Join, [ { 0, left: PlanNode }, { 1, right: PlanNode } ], [ { 2, cond: Expr } ], { join_type: JoinType } ); }
Developers will also need to add the plan node type into the OptRelNodeTyp
enum, implement is_plan_node
and is_expression
for them, and implement the explain format in explain
.
Expressions
SQL Expressions are also a kind of RelNode
. We have binary expressions, function calls, etc. in the representation.
Notably, we convert all column references into column indexes in the Datafusion bridge. For example, if Datafusion yields a logical plan of:
LogicalJoin { a = b }
Scan t1 [a, v1, v2]
Scan t2 [b, v3, v4]
It will be converted to:
LogicalJoin { #0 = #3 }
Scan t1
Scan t2
in the optd representation.
For SQL expressions, the optd Datafusion representation does not do cost-based searches on expressions, though this is supported in optd-core. Each SQL expression can only have one binding in the current implementation.
Explain
We use risinglightdb's pretty-xmlish crate and implement a custom explain format for Datafusion plan nodes.
#![allow(unused)] fn main() { PhysicalProjection { exprs: [ #0 ] } └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } ├── PhysicalProjection { exprs: [ #0 ] } │ └── PhysicalScan { table: t1 } └── PhysicalProjection { exprs: [ #0 ] } └── PhysicalScan { table: t2 } }
This is different from the default Lisp-representation of the RelNode
.
Rules
Currently, we have a few rules that pulls filters and projections up and down through joins. Also, we have join assoc and join commute rules to reorder the joins.
Properties
We have the Schema
property that will be used in the optimizer rules to determine number of columns of each plan nodes so that we can rewrite column reference expressions correctly.
Cost Model
We have a simple cost model that computes I/O cost and compute cost based on number of rows of the children plan nodes.
Cardinality Estimation
As per Leis 2015, we define cardinality estimation to be a separate component from the cost model. Statistics are considered a part of the cardinality estimation component. The internal name for our cardinality estimation component is Gungnir™. Gungnir is the mythical spear wielded by Odin which adaptively changes course mid-air so as to never miss its mark. It represents both the accuracy and adaptivity of our cardinality estimation subsystem.
Our base cardinality estimation scheme is inspired by Postgres. We utilize roughly the same four per-column statistics as Postgres: the most common values of that column, the # of distinct values of that column, the fraction of nulls of that column, and a distribution of values for that column. Our base predicate (filter or join) selectivity formulas are also the same as Postgres. This is as opposed to Microsoft SQLServer, for instance, which utilizes very different per-column statistics and predicate selectivity formulas. Our statistics are not exactly the same as Postgres though. For one, while Postgres uses a simple equi-height histogram, we utilize the more advanced T-Digest data structure to model the distribution of values. Additionally, Postgres samples its tables to build its statistics whereas we do a full sequential scan of all tables. This full sequential scan is made efficient by the fact that we use sketches, which have a low time complexity, and we implemented our sketching algorithms to be easily parallelizable.
Statistics
We obtain our statistics with high parallelism and a minimal memory footprint, using probabilistic algorithms that trade off accuracy for scalability. Specifically:
-
The distribution of each column (i.e., CDF) is computed using the TDigest algorithm designed by Ted Dunning et al., rather than traditional equi-width histograms. TDigests can be seen as dynamically resizable histograms that offer particular precision at the tails of the distribution.
-
The value of N-Distinct is calculated using Philippe Flajolet et al. HyperLogLog algorithm. Similarly, it estimates the unique number of values within a column by examining the pattern of the most unique element. This approach is memory-bounded and doesn't require a hash-set, unlike traditional methods.
-
(TODO @AlSchlo: Whatever we use for MCV)
All of these techniques are embarrassingly parallel tasks by design: threads first scan a partition of the data, maintain a memory-bounded structure (with higher memory leading to higher accuracy), and then merge their individual results into the final column statistics.
(TODO @CostModelTeam: explain adaptivity once we've implemented it)
Re-optimization
optd implements re-optimization inspired by How I Learned to Stop Worrying and Love Re-optimization. optd generates a plan, injects executors to collect runtime data, and uses the runtime information from the previous run to guide the optimization process.
optd persists optimizer states from run to run. The states include: the memo table, whether a rule is applied on an expression, explored groups, etc. By persisting the states, optd can easily match a query plan or a subset of the query plan with plans that have been executed. Once these plan matches are discovered, the adaptive cost model can use the runtime data in the cost computation process to make the cost model more robust and accurate.
Cost Model
In the optd Datafusion representation, we have 2 cost models: the base cost model and the adaptive cost model. The base cost model estimates the compute and I/O cost solely based on number of rows. The adaptive cost model maintains a hash map that maps plan node group ID to runtime information from the previous N runs, and uses these runtime information to compute a more accurate row count. The adaptive cost model will use the accurate row count information to call into the base cost model that computes a more accurate compute and I/O cost.
Execution
optd will inject collector executors into the query plan. We extended Datafusion to have a new executor called physical collector. The executor will count the number of rows passed from the child executor to the parent executor, and then store the information into the runtime data storage.
Optimization Phases
To enable re-optimization, the user should not clear the internal state of the optimizer. This can be achieved by calling step_clear_winner
and then step_optimize_rel
.
Partial Exploration
When the plan space is very large, optd will generate a sub-optimal plan at first, and then use the runtime information to continue the plan space search next time the same query (or a similar query) is being optimized. This is partial exploration.
Developers can pass partial_explore_iter
and partial_explore_space
to the optimizer options to specify how large the optimizer will expand each time step_optimize_rel
is invoked. To use partial exploration, developers should not clear the internal state of the optimizer across different runs.
Three Join Demo
You can run this demo with the following command:
cargo run --release --bin optd-adaptive-three-join
We create 3 tables and join them. The underlying data are getting updated every time the query is executed.
select * from t1, t2, t3 where t1v1 = t2v1 and t1v2 = t3v2;
When the data distribution and the table size changes, the optimal join order will be different. The output of this demo is as below.
Iter 66: (HashJoin (HashJoin t1 t2) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 66/66=100.000
Iter 67: (HashJoin (HashJoin t2 t1) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 66/67=98.507
Iter 68: (HashJoin t2 (HashJoin t1 t3)) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 66/68=97.059
Iter 69: (HashJoin (HashJoin t1 t2) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 67/69=97.101
Iter 70: (HashJoin (HashJoin t1 t2) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 68/70=97.143
Iter 71: (HashJoin (HashJoin t1 t2) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 69/71=97.183
Iter 72: (HashJoin (HashJoin t2 t1) t3) <-> (best) (HashJoin (HashJoin t1 t2) t3), Accuracy: 69/72=95.833
The left plan Lisp representation is the join order determined by the adaptive query optimization algorithm. The right plan is the best plan. The accuracy is the percentage of executions that the adaptive query optimization algorithm generates the best cost-optimal plan.
To find the optimal plan and compute the accuracy, we set up two optimizers in this demo: the normal optimizer and the optimal optimizer. Each time we insert some data into the tables, we will invoke the normal optimizer once, and invoke the optimal optimizer with all possible combination of join orders, so that the optimal optimizer can produce an optimal plan based on the cost model and the join selectivity.
As the algorithm can only know the runtime information from last run before new data are added into the tables, there will be some iterations where it cannot generate the optimal plan. But it will converge to the optimal plan as more runtime information is collected.
TPC-H Q8 Demo
You can run this demo with the following command:
cargo run --release --bin optd-adaptive-tpch-q8
In this demo, we create the TPC-H schema with test data of scale 0.01. There are 8 tables in TPC-H Q8, and it is impossible to enumerate all join combinations in one run. The demo will run this query multiple times, each time exploring a subset of the plan space. Therefore, optimization will be fast for each iteration, and as the plan space is more explored in each iteration, the produced plan will converge to the optimal join order.
--- ITERATION 5 ---
plan space size budget used, not applying logical rules any more. current plan space: 10354
(HashJoin region (HashJoin (HashJoin (HashJoin (HashJoin (HashJoin part (HashJoin supplier lineitem)) orders) customer) nation) nation))
plan space size budget used, not applying logical rules any more. current plan space: 11743
+--------+------------+
| col0 | col1 |
+--------+------------+
| 1995.0 | 1.00000000 |
| 1996.0 | 0.32989690 |
+--------+------------+
2 rows in set. Query took 0.115 seconds.
The output contains the current join order in Lisp representation, the plan space, and the query result.
Cost Model Cardinality Benchmarking
Overview
You can benchmark the cardinality estimates of optd's cost model against other DBMSs using the optd-perfbench module.
All aspects of benchmarking (except for setting up comparison DBMSs) are handled automatically. This includes loading workload data, building statistics, gathering the true cardinality of workload queries, running explains on workload queries, and aggregating cardinality estimation results.
We elected not to automate the installation and setup of the DBMS in order to accomodate the needs of all users. For instance, some users prefer installing Postgres on Homebrew, others choose to install the Mac application, while others wish to create a Postgres Docker container. However, it could be feasible in the future to standardize on Docker and automatically start a container. The only difficult part in that scenario is tuning Postgres/other DBMSs to the machine being run on, as this is currently done manually using PGTune.
Additionally, our system provides fine-grained, robust caching for every single step of the process. After the first run of a workload, all subsequent runs will only require running explains, which takes in a matter of seconds for all workloads. We use "acknowledgement files" to ensure that the caching is robust in that we never cache incomplete results.
Basic Operation
First, you need to manually install, configure, and start the DBMS(s) being compared against. Currently, only Postgres is supported. To see an example of how Postgres is installed, configured, and started on a Mac, check the patrick/
folder in the gungnir-experiments repository.
Once the DBMS(s) being compared against are set up, run this to quickly get started. It should take a few minutes on the first run and a few seconds on subsequent runs. This specific command that tests TPC-H with scale factor 0.01 is run in a CI script before every merge to main, so it should be very reliable.
cargo run --release --bin optd-perfbench cardbench tpch --scale-factor 0.01
After this, you can try out different workloads and scale factors based on the CLI options.
Roughly speaking, there are two main ways the benchmarking system is used: (a) to compare the cardinality estimates of optd against another system in aggregate or (b) to investigate the cardinality estimates of a small subset of queries. The command above is for use case (a). The system automatically outputs a variety of aggregate information about the q-error including median, p95, max, and more. Additionally, the system outputs comparative information which shows the # of queries in which a given DBMS performs the best or is tied for the best.
For use case (b), you will want to set the RUST_LOG
environment variable to info
and use the --query-ids
parameter. Setting RUST_LOG
to info
will show the results of the explain commands on all DBMSs and --query-ids
will let you only run specific queries to avoid cluttering the output.
RUST_LOG=info cargo run --release --bin optd-perfbench cardbench tpch --scale-factor 0.01 --query-ids 2
Supporting More Queries
Currently, we are missing support for a few queries in TPC-H, JOB, and JOB-light. An approximate list of supported queries can be found in the [workload].rs
files (e.g. tpch.rs
and job.rs
). If --query-ids
is ommitted from the command, we use the list of supported queries as defined in the [workload].rs
file by default. Some of these queries are not supported by DataFusion, some by optd, and some because we run into an OOM error when trying to execute them on Postgres. Because of the last point, the set of supported queries may be different on different machines. The list of queries in [workload].rs
(at least the one in tpch.rs
) is tested to be working on the CI machine.
The definitive list of supported queries on your machine can be found by running dev_scripts/which_queries_work.sh
, which simply runs the benchmarking system for each query individually. While this script does take a long time to complete when first run, it has the nice side effect of warming up all your caches so that subsequent runs are fast. The script outputs a string to replace the WORKING_*QUERY_IDS
variable in [workload].rs
as well as another string to use as the --query-ids
argument. If you are use which_queries_work.sh
to figure out the queries that work on your machine, you probably want to use --query-ids
instead of setting WORKING_*QUERY_IDS
.
If you add support for more queries, you will want to rerun dev_scripts/which_queries_work.sh
. Since you are permanently adding support for more queries, you will want to update WORKING_*QUERY_IDS
.
Adding More DBMSs
Currently, only Postgres is supported. Additional DBMSs can be easily added using the CardbenchRunnerDBMSHelper
trait and optionally the TruecardGetter
trait. CardbenchRunnerDBMSHelper
must be implemented by all DBMSs that are supported because it has functions for gathering estimated cardinalities from DBMSs. TruecardGetter
only needs to be implemented by at least one DBMS. The true cardinality should be the same across all DBMSs, so we only execute the queries for real on a single DBMS to drastically reduce benchmarking runtime. TruecardGetter
is currently implemented for Postgres, so it is unnecessary to implement this for any other DBMS unless one wishes to improve the runtime of benchmarking (e.g. by gathering true cardinalities using an OLAP DBMS for OLAP workloads). Do keep in mind that true cardinalities are cached after the first run of a workload and can be shared across users (in the future, perhaps we'll even put the cached true cardinalities in the GitHub repository itself), so this optimization is not terribly important.
SQLPlannerTest
optd uses risinglightdb's SQL planner test library to ensure the optimizer works correctly and stably produces an expected plan. SQL planner test is a regression test. Developers provide the test framework a yaml file with the queries to be optimized and the information they want to collect. The test framework generates the test result and store them in SQL files. When a developer submits a pull request, the reviewers should check if any of these outputs are changed unexpectedly.
The test cases can be found in optd-sqlplannertest/tests
. Currently, we check if optd can enumerate all join orders by using the explain:logical_join_orders,physical_plan
task and check if the query output is as expected by using the execute
task.
Datafusion CLI
Developers can interact with optd by using the Datafusion cli. The cli supports creating tables, populating data, and executing ANSI SQL queries.
cargo run --bin datafusion-optd-cli
We also have a scale 0.01 TPC-H dataset to test. The test SQL can be executed with the Datafusion cli.
cargo run --bin datafusion-optd-cli -- -f datafusion-optd-cli/tpch-sf0_01/test.sql
Miscellaneous
This is a note covering things that do not work well in the system right now.
Type System
Currently, we hard code decimal type to have 15, 2
precision. Type inferences should be done in the schema property inference.
Expression
optd supports exploring SQL expressions in the optimization process. However, this might be super inefficient as optimizing a plan node (i.e., join to hash join) usually needs the full binding of an expression tree. This could have exponential plan space and is super inefficient.
Bindings
We do not have something like a binding iterator as in the Cascades paper. Before applying a rule, we will generate all bindings of a group, which might take a lot of memory. This should be fixed in the future.
Cycle Detection + DAG
Consider the case for join commute rule.
(Join A B) <- group 1
(Projection (Join B A) <expressions list>) <- group 2
(Projection (Projection (Join A B) <expressions list>) <expressions list>) <- group 1 may refer itself
After applying the rule twice, the memo table will have self-referential groups. Currently, we detect such self-referential things in optimize group task. Probably there will be better ways to do that.
The same applies to DAG / Recursive CTEs -- we did not test if the framework works with DAG but in theory it should support it. We just need to ensure a node in DAG does not get searched twice.
DAG
For DAG, another challenge is to recover the reusable fragments from the optimizer output. The optimizer can give you a DAG output but by iterating through the plan, you cannot know which parts can be reused/materialized. Therefore, we might need to produce some extra information with the plan node output. i.e., a graph-representation with metadata of each node, instead of RelNode
. This also helps the process of inserting the physical collector plan nodes, which is currently a little bit hacky in the implementation.
Memo Table
Obviously, it is not efficient to simply store a mapping from RelNode to the expression id. Cannot imagine how many levels of depths will it require to compute a hash of a tree structure.
Partial Exploration
Each iteration will only be slower because we have to invoke the optimize group tasks before we can find a group to apply the rule. Probably we can keep the task stack across runs to make it faster.
Physical Property + Enforcer Rules
A major missing feature in the optimizer. Need this to support shuffling and sort optimizations.
Pruning
Currently, we have implemented the pruning condition as in the paper, but we did not actually enable it.