Skip to content

[Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus #17155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from

Conversation

reele
Copy link
Contributor

@reele reele commented Apr 30, 2025

Purpose of the pull request

implement #16979

Brief change log

  1. Refactored DelayEntry.java,PriorityDelayQueue,PriorityAndDelayBasedTaskEntry,TimeBasedTaskExecutionRunnableComparableEntry with TaskDispatchDelayEntryEvent,TaskDispatchPriorityEntryEvent,AbstractTaskDispatchEntryEvent,TaskDispatchEntryEventBus which inherited from AbstractDelayEventBus,AbstractDelayEvent

  2. Fix PriorityAndDelayBasedTaskEntry's compare issue, old object compare task priority after compare time in millisecond, cause the task priority only affect in same millisecond, now it will compare task priority first, then the create time.

Verify this pull request

the test codes were refactored.

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md

this.data = checkNotNull(data, "data is null");
}

public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) {

Check failure

Code scanning / CodeQL

Overloaded compareTo Error

The parameter of compareTo should have type 'Delayed' when implementing 'Comparable'.
@Override
public int hashCode() {
return super.hashCode();
public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) {

Check failure

Code scanning / CodeQL

Overloaded compareTo Error

The parameter of compareTo should have type 'Delayed' when implementing 'Comparable'.
}

@Override
public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) {

Check failure

Code scanning / CodeQL

Overloaded compareTo Error

The parameter of compareTo should have type 'Delayed' when implementing 'Comparable'.
Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper, this queue is used to handle the delay time, but bring some extra problems, e.g. there are existing many thread to deal with the task lifecycle in master, we need to do extra concurrency control, e.g. one task is in dispatching, and then receive a kill event.

Since TaskDispatchLifecycleEvent is already an DelayEvent so we can directly set the delay time to it, then we can removed TimeBasedTaskExecutionRunnableComparableEntry and WorkerGroupTaskDispatcher. We directly use ITaskExecutorClient to dispatch task in dispatchEventAction, if dispatch failed then regenerate a TaskDispatchLifecycleEvent with retry delay time.

Comment on lines +38 to +40
public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) {
return super.compareTo(other);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to compare the data if super.compareTo(other) == 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll handle it

@ruanwenjun ruanwenjun added this to the 3.3.1 milestone Apr 30, 2025
@ruanwenjun ruanwenjun added the improvement make more easy to user or prompt friendly label Apr 30, 2025
@ruanwenjun
Copy link
Member

ruanwenjun commented Apr 30, 2025

Good job, after this PR merged, the master code will be clearer.
Base on this, we can provide more EventBus to expose the workflow instance/task instance lifecycle event to exposed to outside.

@reele
Copy link
Contributor Author

reele commented Apr 30, 2025

Can we remove GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper, this queue is used to handle the delay time, but bring some extra problems, e.g. there are existing many thread to deal with the task lifecycle in master, we need to do extra concurrency control, e.g. one task is in dispatching, and then receive a kill event.

Since TaskDispatchLifecycleEvent is already an DelayEvent so we can directly set the delay time to it, then we can removed TimeBasedTaskExecutionRunnableComparableEntry and WorkerGroupTaskDispatcher. We directly use ITaskExecutorClient to dispatch task in dispatchEventAction, if dispatch failed then regenerate a TaskDispatchLifecycleEvent with retry delay time.

i'll have a try.

@ruanwenjun
Copy link
Member

@reele Sorry, I miss something, we need to sort the tasks by priority under one worker group, so WorkerGroupTaskDispatcher cannot be removed, but GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper can be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend improvement make more easy to user or prompt friendly test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants