Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38591][SQL] Add flatMapSortedGroups to KeyValueGroupedDataset #35899

Closed
wants to merge 6 commits into from

Conversation

EnricoMi
Copy link
Contributor

What changes were proposed in this pull request?

This adds a sorted version of ds.groupByKey(…).flatMapGroups(…).

Why are the changes needed?

The existing method flatMapGroups provides an iterator of rows for each group key. If user code would requires those rows in a particular order, that iterator would have to be sorted first, which is against the idea of an iterator in the first place. For groups that do not fit into memory of one executor, this approach does not work.

org.apache.spark.sql.KeyValueGroupedDataset:

Internally, the implementation will spill to disk if any given group is too large to fit into
memory. However, users must take care to avoid materializing the whole iterator for a group
(for example, by calling toList) unless they are sure that this is possible given the memory
constraints of their cluster.

The implementation of KeyValueGroupedDataset.flatMapGroups already sorts each partition according to the group key. By additionally sorting by some data columns, the iterator can be guaranteed to provide some order.

Does this PR introduce any user-facing change?

This adds KeyValueGroupedDataset.flatMapSortedGroups.

How was this patch tested?

There is test DatasetSuite."groupBy function, flatMapSorted by func" and DatasetSuite."groupBy function, flatMapSorted by expr".

@EnricoMi EnricoMi changed the title Add flatMapSortedGroups to KeyValueGroupedDataset [SPARK-38591][SQL] Add flatMapSortedGroups to KeyValueGroupedDataset Mar 17, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi
Copy link
Contributor Author

*
* @since 3.4.0
*/
def flatMapSortedGroups[S: Encoder, U : Encoder]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's too much to add an API to only allow sorting data part. Especially, we can already do this by sorting the iterator right? the only problem this API solves is that when each group is too big to fix in the memory at the executor.

Another problem is that, what if we want to sort in the reversed order or only a couple of columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With V => S you can pick any columns of V you like:

case class Value(id, seq, timestamp, value)
val ds: Dataset[Value]
ds.groupBy(v => v.id).flatMapSortedGroups(v => (v.seq, v.timestamp)) { (_, iter) => iter }

The sort order can be added to the function so it can easily be given by the user:
(s: V => S, direction: SortDirection = Ascending)

The Column-variant of flatMapSortedGroups below also allows for any number of columns and sort direction.

@EnricoMi EnricoMi force-pushed the branch-sorted-groups branch from 65392ee to 3c8165d Compare May 3, 2022 12:39
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Aug 12, 2022
@github-actions github-actions bot closed this Aug 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants