Skip to content

Optimization: Go from general logical plans to dask-specific plans? #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
nils-braun opened this issue May 20, 2021 · 1 comment
Open

Comments

@nils-braun
Copy link
Collaborator

nils-braun commented May 20, 2021

This question is both a feature proposal and a question.

Currently, the process from a SQL (string) to a calculated dataframe is as follows:

  1. Use Calcite for SQL parsing and non-optional relational algebra creation
  2. Optimize with Calcite using rule-based optimizations resulting into a plan of Logical*. Those relational plans are quite generic, do not know anything about dask, distributed processing or the data
  3. Transform each relational algebra one-by-one to dask API calls
  4. Let dask do another round of optimizations on the dask graph and execute it

While this works quite well and is also used e.g. at blazingSQL, there are at least two problems with it:

  • Certain optimizations are only possible if you know, that there is a distributed processing involved: basically you can always decide whether to do something in parallel or sequentially and you need to define when to shuffle (or if you need shuffling at all). Example SQL would be SELECT ROW_NUMBER OVER (...), 1 + SUM(x) OVER (...) FROM data. It is much more optimal to only do the shuffling needed for the two windows once, and then just add the 1 to one of the columns in place. However currently, there is no way in getting to know that and the two window functions are calculated separately.
  • Calcite does not know about the data, which means there can not be any cost-based optimizations (which typically lead to much better results)
  • We can not properly implement predicate pushdown on data sources, which understand it (e.g. Hive+parquet, see Use the correct hive partition type information (hacky solution) gallamine/dask-sql#1)

One possibility to solve these problems and therefore maybe boost the speed would be to bring the distributed operations like "Shuffle" etc to Calcite and let it decide when to shuffle and just copy its decision to dask. The resulting optimized plan would then look similar to e.g. Spark's output. I was hoping that we could maybe "copy" the procedure from other Apache Calcite projects and then just execute their optimized physical plan, but so far was not able to find a good project.

@nils-braun
Copy link
Collaborator Author

I am actively looking for people with Calcite (or any similar project) experience :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant