-
Notifications
You must be signed in to change notification settings - Fork 135
[UNOMI-878] Add advanced scheduler service and task persistence capabilities #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
sergehuber
wants to merge
3
commits into
master
Choose a base branch
from
UNOMI-878-new-scheduler
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ilities - Introduced a new, robust scheduler service with extensive task management and cluster-aware features. - Added support for task persistence, recovery, metrics tracking, and execution history. - Implemented additional configuration options for scheduler properties such as `nodeId`, `purgeTaskEnabled`, and `lockTimeout`. - Enhanced task lifecycle management with new components including `TaskExecutionManager`, `TaskRecoveryManager`, and `TaskHistoryManager`. - Updated API and documentation to reflect changes.
…ed Developer Experience and Persistence Integration
diff --git c/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java i/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java index fee5f3a..550b4cc 100644 --- c/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java +++ i/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java @@ -26,6 +26,8 @@ import org.apache.unomi.api.Metadata; import org.apache.unomi.api.actions.ActionType; import org.apache.unomi.api.services.DefinitionsService; import org.apache.unomi.api.services.SchedulerService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.groovy.actions.GroovyAction; import org.apache.unomi.groovy.actions.GroovyBundleResourceConnector; import org.apache.unomi.groovy.actions.ScriptMetadata; @@ -51,7 +53,6 @@ import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -84,6 +85,7 @@ public class GroovyActionsServiceImpl implements GroovyActionsService { private static final Logger LOGGER = LoggerFactory.getLogger(GroovyActionsServiceImpl.class.getName()); private static final String BASE_SCRIPT_NAME = "BaseScript"; + private static final String REFRESH_ACTIONS_TASK_TYPE = "refresh-groovy-actions"; private DefinitionsService definitionsService; private PersistenceService persistenceService; @@ -504,15 +506,32 @@ public class GroovyActionsServiceImpl implements GroovyActionsService { * Initializes periodic script refresh timer. */ private void initializeTimers() { - TimerTask task = new TimerTask() { + TaskExecutor refreshGroovyActionsTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - refreshGroovyActions(); + public String getTaskType() { + return REFRESH_ACTIONS_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + refreshGroovyActions(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while reassigning profile data", e); + callback.fail(e.getMessage()); + } } }; + + schedulerService.registerTaskExecutor(refreshGroovyActionsTaskExecutor); + if (this.refreshGroovyActionsTaskId != null) { schedulerService.cancelTask(this.refreshGroovyActionsTaskId); } - this.refreshGroovyActionsTaskId = schedulerService.createRecurringTask("refreshGroovyActions", config.services_groovy_actions_refresh_interval(), TimeUnit.MILLISECONDS, task, false).getItemId(); + this.refreshGroovyActionsTaskId = schedulerService.newTask(REFRESH_ACTIONS_TASK_TYPE) + .withPeriod(config.services_groovy_actions_refresh_interval(), TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); } } diff --git c/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java i/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java index f7e68b7..1141f4f 100644 --- c/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java +++ i/extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java @@ -30,6 +30,7 @@ import org.apache.unomi.api.Item; import org.apache.unomi.api.services.SchedulerService; import org.apache.unomi.api.services.ScopeService; import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.schema.api.JsonSchemaWrapper; import org.apache.unomi.schema.api.SchemaService; @@ -43,7 +44,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class SchemaServiceImpl implements SchemaService { @@ -71,8 +74,10 @@ public class SchemaServiceImpl implements SchemaService { private PersistenceService persistenceService; private ScopeService scopeService; private JsonSchemaFactory jsonSchemaFactory; + private SchedulerService schedulerService; private String refreshJSONSchemasTaskId; + private static final String REFRESH_SCHEMAS_TASK_TYPE = "refresh-json-schemas"; @OverRide public boolean isValid(String data, String schemaId) { @@ -369,18 +374,31 @@ public class SchemaServiceImpl implements SchemaService { } private void initTimers() { - TimerTask task = new TimerTask() { + TaskExecutor refreshSchemasTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - try { - refreshJSONSchemas(); - } catch (Exception e) { - LOGGER.error("Unexpected error while refreshing JSON Schemas", e); + public String getTaskType() { + return REFRESH_SCHEMAS_TASK_TYPE; } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + refreshJSONSchemas(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while refreshing json scehams", e); + callback.fail(e.getMessage()); + } } }; + + schedulerService.registerTaskExecutor(refreshSchemasTaskExecutor); + this.resetTimers(); - this.refreshJSONSchemasTaskId = schedulerService.createRecurringTask("refreshJSONSchemas", jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); + this.refreshJSONSchemasTaskId = schedulerService.newTask(REFRESH_SCHEMAS_TASK_TYPE) + .withPeriod(jsonSchemaRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); } private void resetTimers() { diff --git c/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java index 1333d4b..44c4748 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java @@ -26,6 +26,8 @@ import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.conditions.ConditionType; import org.apache.unomi.api.services.ClusterService; import org.apache.unomi.api.services.SchedulerService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.lifecycle.BundleWatcher; import org.apache.unomi.persistence.spi.PersistenceService; import org.osgi.framework.BundleContext; @@ -61,10 +63,13 @@ public class ClusterServiceImpl implements ClusterService { private volatile boolean shutdownNow = false; private volatile List<ClusterNode> cachedClusterNodes = Collections.emptyList(); - private BundleWatcher bundleWatcher; + private static final String CLUSTER_NODE_STAT_UPDATE_TASK_TYPE = "cluster-node-statistics-update"; + private static final String CLUSTER_STALE_NODE_CLEANUP_TASK_TYPE = "cluster-stale-nodes-cleanup"; private String clusterNodeStatisticsUpdateTaskId; private String clusterStaleNodesCleanupTaskId; + private BundleWatcher bundleWatcher; + /** * Max time to wait for persistence service (in milliseconds) */ @@ -215,40 +220,59 @@ public class ClusterServiceImpl implements ClusterService { return; } - // Schedule regular updates of the node statistics - TimerTask statisticsTask = new TimerTask() { + TaskExecutor clusterNodeStatisticsUpdateTaskExecutor = new TaskExecutor() { @OverRide - public void run() { + public String getTaskType() { + return CLUSTER_NODE_STAT_UPDATE_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { try { updateSystemStats(); - } catch (Throwable t) { - LOGGER.error("Error updating system statistics", t); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while updating cluster node statistics", e); + callback.fail(e.getMessage()); } } }; - this.clusterNodeStatisticsUpdateTaskId = schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false).getItemId(); - // Schedule cleanup of stale nodes - TimerTask cleanupTask = new TimerTask() { + TaskExecutor clusterStaleNodesCleanupTaskExecutor = new TaskExecutor() { @OverRide - public void run() { + public String getTaskType() { + return CLUSTER_STALE_NODE_CLEANUP_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { try { cleanupStaleNodes(); - } catch (Throwable t) { - LOGGER.error("Error cleaning up stale nodes", t); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while cleaning staled cluster nodes", e); + callback.fail(e.getMessage()); } } }; - this.clusterStaleNodesCleanupTaskId = schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000, TimeUnit.MILLISECONDS, cleanupTask, false).getItemId(); + + schedulerService.registerTaskExecutor(clusterNodeStatisticsUpdateTaskExecutor); + schedulerService.registerTaskExecutor(clusterStaleNodesCleanupTaskExecutor); + + this.resetTimers(); + this.clusterNodeStatisticsUpdateTaskId = schedulerService.newTask(CLUSTER_NODE_STAT_UPDATE_TASK_TYPE) + .withPeriod(nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); + this.clusterStaleNodesCleanupTaskId = schedulerService.newTask(CLUSTER_STALE_NODE_CLEANUP_TASK_TYPE) + .withPeriod(60000, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); LOGGER.info("Cluster service scheduled tasks initialized"); } - public void destroy() { - LOGGER.info("Cluster service shutting down..."); - shutdownNow = true; - - // Cancel scheduled tasks + private void resetTimers() { if (schedulerService != null && clusterNodeStatisticsUpdateTaskId != null) { schedulerService.cancelTask(clusterNodeStatisticsUpdateTaskId); clusterStaleNodesCleanupTaskId = null; @@ -257,6 +281,13 @@ public class ClusterServiceImpl implements ClusterService { schedulerService.cancelTask(clusterStaleNodesCleanupTaskId); clusterStaleNodesCleanupTaskId = null; } + } + + public void destroy() { + LOGGER.info("Cluster service shutting down..."); + shutdownNow = true; + + this.resetTimers(); // Remove node from persistence service if (persistenceService != null) { diff --git c/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java index ff7babc..4515ff7 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/definitions/DefinitionsServiceImpl.java @@ -25,6 +25,8 @@ import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.conditions.ConditionType; import org.apache.unomi.api.services.DefinitionsService; import org.apache.unomi.api.services.SchedulerService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.api.utils.ConditionBuilder; import org.apache.unomi.api.utils.ParserHelper; import org.apache.unomi.persistence.spi.CustomObjectMapper; @@ -60,6 +62,8 @@ public class DefinitionsServiceImpl implements DefinitionsService, SynchronousBu private ConditionBuilder conditionBuilder; private BundleContext bundleContext; + + private static final String RELOAD_TYPES_TASK_TYPE = "reload-types"; private String reloadTypesTaskId; public DefinitionsServiceImpl() { @@ -100,20 +104,39 @@ public class DefinitionsServiceImpl implements DefinitionsService, SynchronousBu } private void scheduleTypeReloads() { - TimerTask task = new TimerTask() { + TaskExecutor reloadTypesTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - reloadTypes(false); + public String getTaskType() { + return RELOAD_TYPES_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + reloadTypes(false); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while reloading types", e); + callback.fail(e.getMessage()); + } } }; + + schedulerService.registerTaskExecutor(reloadTypesTaskExecutor); + this.resetTypeReloads(); - this.reloadTypesTaskId = schedulerService.createRecurringTask("reloadTypes", definitionsRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); + this.reloadTypesTaskId = schedulerService.newTask(RELOAD_TYPES_TASK_TYPE) + .withPeriod(definitionsRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); + LOGGER.info("Scheduled task for condition type loading each 10s"); } private void resetTypeReloads() { if (this.reloadTypesTaskId != null) { schedulerService.cancelTask(this.reloadTypesTaskId); + this.reloadTypesTaskId = null; } } diff --git c/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index 78ca739..9dfac14 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -29,6 +29,8 @@ import org.apache.unomi.api.services.DefinitionsService; import org.apache.unomi.api.services.ProfileService; import org.apache.unomi.api.services.SchedulerService; import org.apache.unomi.api.services.SegmentService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.persistence.spi.CustomObjectMapper; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.PropertyHelper; @@ -189,6 +191,8 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList private boolean forceRefreshOnSave = false; + private static final String PROPERTY_TYPE_LOAD_TASK_TYPE = "property-type-load"; + private static final String PROFILES_PURGE_TASK_TYPE = "profiles-purge"; private String propertyTypeLoadTaskId; private String purgeProfilesTaskId; @@ -301,14 +305,31 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } private void schedulePropertyTypeLoad() { - TimerTask task = new TimerTask() { + TaskExecutor reloadPropertyTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - reloadPropertyTypes(false); + public String getTaskType() { + return PROPERTY_TYPE_LOAD_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + reloadPropertyTypes(false); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while reloading property type", e); + callback.fail(e.getMessage()); + } } }; + + schedulerService.registerTaskExecutor(reloadPropertyTaskExecutor); + this.resetPropertyTypeLoadTask(); - this.propertyTypeLoadTaskId = schedulerService.createRecurringTask("propertyTypeLoad", propertiesRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); + this.propertyTypeLoadTaskId = schedulerService.newTask(PROPERTY_TYPE_LOAD_TASK_TYPE) + .withPeriod(propertiesRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); LOGGER.info("Scheduled task for property type loading each {}ms", propertiesRefreshInterval); } @@ -406,7 +427,6 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList if (purgeProfileExistTime > 0) { LOGGER.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime); } - if (purgeSessionExistTime > 0) { LOGGER.info("Purge: Session items created since more than {} days, will be purged", purgeSessionExistTime); } @@ -414,9 +434,14 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList LOGGER.info("Purge: Event items created since more than {} days, will be purged", purgeEventExistTime); } - TimerTask task = new TimerTask() { + TaskExecutor purgeProfilesTaskExecutor = new TaskExecutor() { @OverRide - public void run() { + public String getTaskType() { + return PROFILES_PURGE_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { try { long purgeStartTime = System.currentTimeMillis(); LOGGER.info("Purge: triggered"); @@ -428,14 +453,21 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList purgeSessionItems(purgeSessionExistTime); purgeEventItems(purgeEventExistTime); LOGGER.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime); - } catch (Throwable t) { - LOGGER.error("Error while purging", t); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while purging", e); + callback.fail(e.getMessage()); } } }; - this.resetProfilesPurgeTask(); - this.purgeProfilesTaskId = schedulerService.createRecurringTask("profilesPurge", purgeProfileInterval, TimeUnit.DAYS, task, false).getItemId(); + schedulerService.registerTaskExecutor(purgeProfilesTaskExecutor); + + this.resetProfilesPurgeTask(); + this.purgeProfilesTaskId = schedulerService.newTask(PROFILES_PURGE_TASK_TYPE) + .withPeriod(purgeProfileInterval, TimeUnit.DAYS) + .nonPersistent() + .schedule().getItemId(); LOGGER.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval); } else { LOGGER.info("Purge: No purge scheduled"); diff --git c/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java index 71cc75e..7389883 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java @@ -28,6 +28,8 @@ import org.apache.unomi.api.query.Query; import org.apache.unomi.api.rules.Rule; import org.apache.unomi.api.rules.RuleStatistics; import org.apache.unomi.api.services.*; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.persistence.spi.CustomObjectMapper; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.config.ConfigurationUpdateHelper; @@ -65,14 +67,16 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn private Integer rulesRefreshInterval = 1000; private Integer rulesStatisticsRefreshInterval = 10000; + private static final String REFRESH_RULES_TASK_TYPE = "refresh-rules"; + private static final String REFRESH_RULE_STATS_TASK_TYPE = "refresh-rule-stats"; + private String refreshRulesTaskId; + private String syncRuleStatisticsTaskId; private final List<RuleListenerService> ruleListeners = new CopyOnWriteArrayList<RuleListenerService>(); private Map<String, Set<Rule>> rulesByEventType = new HashMap<>(); private Boolean optimizedRulesActivated = true; - private String refreshRulesTaskId; - private String syncRuleStatisticsTaskId; public void setBundleContext(BundleContext bundleContext) { this.bundleContext = bundleContext; @@ -492,26 +496,53 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn } private void initializeTimers() { - this.resetTimers(); - TimerTask task = new TimerTask() { + TaskExecutor refreshRulesTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - refreshRules(); + public String getTaskType() { + return REFRESH_RULES_TASK_TYPE; } - }; - this.refreshRulesTaskId = schedulerService.createRecurringTask("refreshRules", rulesRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); - TimerTask statisticsTask = new TimerTask() { @OverRide - public void run() { - try { - syncRuleStatistics(); - } catch (Throwable t) { - LOGGER.error("Error synching rule statistics between memory and persistence back-end", t); - } + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + refreshRules(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while refreshing rules", e); + callback.fail(e.getMessage()); + } } }; - this.syncRuleStatisticsTaskId = schedulerService.createRecurringTask("syncRuleStatistics", rulesStatisticsRefreshInterval, TimeUnit.MILLISECONDS, statisticsTask, false).getItemId(); + TaskExecutor refreshRuleStatsTaskExecutor = new TaskExecutor() { + @OverRide + public String getTaskType() { + return REFRESH_RULE_STATS_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + syncRuleStatistics(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while syncing rule statistics", e); + callback.fail(e.getMessage()); + } + } + }; + + schedulerService.registerTaskExecutor(refreshRulesTaskExecutor); + schedulerService.registerTaskExecutor(refreshRuleStatsTaskExecutor); + + this.resetTimers(); + this.refreshRulesTaskId = schedulerService.newTask(REFRESH_RULES_TASK_TYPE) + .withPeriod(rulesRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); + this.refreshRulesTaskId = schedulerService.newTask(REFRESH_RULE_STATS_TASK_TYPE) + .withPeriod(rulesStatisticsRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); } private void resetTimers() { diff --git c/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java index 179392a..acd4e4b 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/scope/ScopeServiceImpl.java @@ -20,11 +20,14 @@ import org.apache.unomi.api.Item; import org.apache.unomi.api.Scope; import org.apache.unomi.api.services.SchedulerService; import org.apache.unomi.api.services.ScopeService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.persistence.spi.PersistenceService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -32,14 +35,14 @@ import java.util.stream.Collectors; public class ScopeServiceImpl implements ScopeService { + private static final Logger LOGGER = LoggerFactory.getLogger(ScopeServiceImpl.class.getName()); + private PersistenceService persistenceService; - private SchedulerService schedulerService; - private Integer scopesRefreshInterval = 1000; - private ConcurrentMap<String, Scope> scopes = new ConcurrentHashMap<>(); + private static final String REFRESH_SCOPES_TASK_TYPE = "refresh-scopes"; private String refreshScopesTaskId; public void setPersistenceService(PersistenceService persistenceService) { @@ -83,14 +86,31 @@ public class ScopeServiceImpl implements ScopeService { } private void initializeTimers() { - TimerTask task = new TimerTask() { + TaskExecutor refreshScopesTaskExecutor = new TaskExecutor() { @OverRide - public void run() { - refreshScopes(); + public String getTaskType() { + return REFRESH_SCOPES_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + refreshScopes(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while refreshing scopes", e); + callback.fail(e.getMessage()); + } } }; + + schedulerService.registerTaskExecutor(refreshScopesTaskExecutor); + this.resetTimers(); - this.refreshScopesTaskId = schedulerService.createRecurringTask("refreshScopes", scopesRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); + this.refreshScopesTaskId = schedulerService.newTask(REFRESH_SCOPES_TASK_TYPE) + .withPeriod(scopesRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); } private void resetTimers() { diff --git c/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java i/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 453203b..c841248 100644 --- c/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ i/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -32,6 +32,8 @@ import org.apache.unomi.api.services.EventService; import org.apache.unomi.api.services.RulesService; import org.apache.unomi.api.services.SchedulerService; import org.apache.unomi.api.services.SegmentService; +import org.apache.unomi.api.tasks.ScheduledTask; +import org.apache.unomi.api.tasks.TaskExecutor; import org.apache.unomi.api.utils.ConditionBuilder; import org.apache.unomi.api.utils.ParserHelper; import org.apache.unomi.persistence.spi.CustomObjectMapper; @@ -83,6 +85,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private int maximumIdsQueryCount = 5000; private boolean pastEventsDisablePartitions = false; private int dailyDateExprEvaluationHourUtc = 5; + + private static final String RECALCULATE_PAST_EVENT_CONDITIONS_TASK_TYPE = "recalculate-past-event-conditions"; + private static final String REFRESH_SEGMENT_AND_SCORING_DEFINITIONS_TASK_TYPE = "refresh-segment-and-scoring-definitions"; private String recalculatePastEventConditionsTaskId; private String refreshSegmentAndScoringDefinitionsTaskId; @@ -1199,38 +1204,63 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } private void initializeTimer() { - this.resetTimers(); - TimerTask task = new TimerTask() { + TaskExecutor recalculatePastEventConditionsTaskExecutor = new TaskExecutor() { @OverRide - public void run() { + public String getTaskType() { + return RECALCULATE_PAST_EVENT_CONDITIONS_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { try { long currentTimeMillis = System.currentTimeMillis(); LOGGER.info("running scheduled task to recalculate segments and scoring that contains date relative conditions"); recalculatePastEventConditions(); LOGGER.info("finished recalculate segments and scoring that contains date relative conditions in {}ms. ", System.currentTimeMillis() - currentTimeMillis); - } catch (Throwable t) { - LOGGER.error("Error while updating profiles for segments and scoring that contains date relative conditions", t); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while updating profiles for segments and scoring that contains date relative conditions", e); + callback.fail(e.getMessage()); } } }; + TaskExecutor refreshSegmentAndScoringDefinitionsTaskExecutor = new TaskExecutor() { + @OverRide + public String getTaskType() { + return REFRESH_SEGMENT_AND_SCORING_DEFINITIONS_TASK_TYPE; + } + + @OverRide + public void execute(ScheduledTask task, TaskExecutor.TaskStatusCallback callback) { + try { + allSegments = getAllSegmentDefinitions(); + allScoring = getAllScoringDefinitions(); + callback.complete(); + } catch (Exception e) { + LOGGER.error("Error while loading segments and scoring definitions from persistence back-end", e); + callback.fail(e.getMessage()); + } + } + }; + + schedulerService.registerTaskExecutor(recalculatePastEventConditionsTaskExecutor); + schedulerService.registerTaskExecutor(refreshSegmentAndScoringDefinitionsTaskExecutor); + + this.resetTimers(); + long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC)); long period = TimeUnit.DAYS.toSeconds(taskExecutionPeriod); LOGGER.info("daily recalculation job for segments and scoring that contains date relative conditions will run at fixed rate, " + "initialDelay={}, taskExecutionPeriod={} in seconds", initialDelay, period); - this.recalculatePastEventConditionsTaskId = schedulerService.createRecurringTask("recalculatePastEventConditions", period, TimeUnit.SECONDS, task, false).getItemId(); - - task = new TimerTask() { - @OverRide - public void run() { - try { - allSegments = getAllSegmentDefinitions(); - allScoring = getAllScoringDefinitions(); - } catch (Throwable t) { - LOGGER.error("Error while loading segments and scoring definitions from persistence back-end", t); - } - } - }; - this.refreshSegmentAndScoringDefinitionsTaskId = schedulerService.createRecurringTask("refreshSegmentAndScoringDefinitions", segmentRefreshInterval, TimeUnit.MILLISECONDS, task, false).getItemId(); + this.recalculatePastEventConditionsTaskId = schedulerService.newTask(RECALCULATE_PAST_EVENT_CONDITIONS_TASK_TYPE) + .withInitialDelay(initialDelay, TimeUnit.SECONDS) + .withPeriod(period, TimeUnit.SECONDS) + .nonPersistent() + .schedule().getItemId(); + this.refreshSegmentAndScoringDefinitionsTaskId = schedulerService.newTask(REFRESH_SEGMENT_AND_SCORING_DEFINITIONS_TASK_TYPE) + .withPeriod(segmentRefreshInterval, TimeUnit.MILLISECONDS) + .nonPersistent() + .schedule().getItemId(); } private void resetTimers() {
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
nodeId,purgeTaskEnabled, andlockTimeout.TaskExecutionManager,TaskRecoveryManager, andTaskHistoryManager.