-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38591][SQL] Add flatMapSortedGroups to KeyValueGroupedDataset #35899
Conversation
Can one of the admins verify this patch? |
* | ||
* @since 3.4.0 | ||
*/ | ||
def flatMapSortedGroups[S: Encoder, U : Encoder] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's too much to add an API to only allow sorting data part. Especially, we can already do this by sorting the iterator right? the only problem this API solves is that when each group is too big to fix in the memory at the executor.
Another problem is that, what if we want to sort in the reversed order or only a couple of columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With V => S
you can pick any columns of V
you like:
case class Value(id, seq, timestamp, value)
val ds: Dataset[Value]
ds.groupBy(v => v.id).flatMapSortedGroups(v => (v.seq, v.timestamp)) { (_, iter) => iter }
The sort order can be added to the function so it can easily be given by the user:
(s: V => S, direction: SortDirection = Ascending)
The Column
-variant of flatMapSortedGroups
below also allows for any number of columns and sort direction.
65392ee
to
3c8165d
Compare
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This adds a sorted version of
ds.groupByKey(…).flatMapGroups(…)
.Why are the changes needed?
The existing method
flatMapGroups
provides an iterator of rows for each group key. If user code would requires those rows in a particular order, that iterator would have to be sorted first, which is against the idea of an iterator in the first place. For groups that do not fit into memory of one executor, this approach does not work.org.apache.spark.sql.KeyValueGroupedDataset:
The implementation of
KeyValueGroupedDataset.flatMapGroups
already sorts each partition according to the group key. By additionally sorting by some data columns, the iterator can be guaranteed to provide some order.Does this PR introduce any user-facing change?
This adds
KeyValueGroupedDataset.flatMapSortedGroups
.How was this patch tested?
There is test
DatasetSuite."groupBy function, flatMapSorted by func"
andDatasetSuite."groupBy function, flatMapSorted by expr"
.