From c2cfeb35cdd45b20ddf5d6325e6658cf434904a3 Mon Sep 17 00:00:00 2001 From: Eugen Nicolae Cojan Date: Fri, 19 Nov 2021 16:16:39 +0200 Subject: [PATCH] Extracted GoalViolationDetector into a base class that can be used for intra broker goals --- .../detector/BaseGoalViolationDetector.java | 228 ++++++++++++++++++ .../detector/GoalViolationDetector.java | 220 ++--------------- .../IntraBrokerGoalViolationDetector.java | 167 +++---------- 3 files changed, 273 insertions(+), 342 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BaseGoalViolationDetector.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BaseGoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BaseGoalViolationDetector.java new file mode 100644 index 0000000000..5cd2a55374 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BaseGoalViolationDetector.java @@ -0,0 +1,228 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.detector; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.linkedin.cruisecontrol.detector.Anomaly; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; +import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils; +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal; +import com.linkedin.kafka.cruisecontrol.common.Utils; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig; +import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; +import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; +import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo; +import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.regex.Pattern; + +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.*; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.getAnomalyDetectionStatus; +import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG; + + +/** + * This class will be scheduled to run periodically to check if the given goals are violated or not. An alert will be + * triggered if one of the goals is not met. + */ +abstract class BaseGoalViolationDetector extends AbstractAnomalyDetector implements Runnable { + protected static final Logger LOG = LoggerFactory.getLogger(BaseGoalViolationDetector.class); + protected final List _detectionGoals; + protected ModelGeneration _lastCheckedModelGeneration; + protected final Pattern _excludedTopics; + protected final boolean _allowCapacityEstimation; + protected final boolean _excludeRecentlyDemotedBrokers; + protected final boolean _excludeRecentlyRemovedBrokers; + protected final Map _balancednessCostByGoal; + protected volatile double _balancednessScore; + protected volatile ProvisionResponse _provisionResponse; + protected volatile boolean _hasPartitionsWithRFGreaterThanNumRacks; + protected final OptimizationOptionsGenerator _optimizationOptionsGenerator; + protected final Timer _goalViolationDetectionTimer; + protected final Meter _automatedRightsizingMeter; + protected static final double BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS = -1.0; + protected final Provisioner _provisioner; + protected final Boolean _isProvisionerEnabled; + protected final boolean _populateReplicaInfo = false; + + BaseGoalViolationDetector(Queue anomalies, KafkaCruiseControl kafkaCruiseControl, MetricRegistry dropwizardMetricRegistry) { + super(anomalies, kafkaCruiseControl); + KafkaCruiseControlConfig config = _kafkaCruiseControl.config(); + // Notice that we use a separate set of Goal instances for anomaly detector to avoid interference. + _detectionGoals = config.getConfiguredInstances(AnomalyDetectorConfig.ANOMALY_DETECTION_GOALS_CONFIG, Goal.class); + _excludedTopics = Pattern.compile(config.getString(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG)); + _allowCapacityEstimation = config.getBoolean(AnomalyDetectorConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG); + _excludeRecentlyDemotedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG); + _excludeRecentlyRemovedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG); + _balancednessCostByGoal = balancednessCostByGoal(_detectionGoals, + config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG), + config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG)); + _balancednessScore = MAX_BALANCEDNESS_SCORE; + _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); + _hasPartitionsWithRFGreaterThanNumRacks = false; + Map overrideConfigs = Map.of(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config, + ADMIN_CLIENT_CONFIG, _kafkaCruiseControl.adminClient()); + _optimizationOptionsGenerator = config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, + OptimizationOptionsGenerator.class, + overrideConfigs); + _goalViolationDetectionTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, + "goal-violation-detection-timer")); + _automatedRightsizingMeter = dropwizardMetricRegistry.meter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, "automated-rightsizing-rate")); + _provisioner = kafkaCruiseControl.provisioner(); + _isProvisionerEnabled = config.getBoolean(AnomalyDetectorConfig.PROVISIONER_ENABLE_CONFIG); + } + + /** + * @return A metric to quantify how well the load distribution on a cluster satisfies the {@link #_detectionGoals}. + */ + public double balancednessScore() { + return _balancednessScore; + } + + /** + * @return Provision status of the cluster based on the latest goal violation check. + */ + public ProvisionStatus provisionStatus() { + return _provisionResponse.status(); + } + + /** + * @return {@code true} if the goal violation detector identified partitions with a replication factor (RF) greater than the number of + * racks that contain brokers that are eligible to host replicas (i.e. not excluded for replica moves), {@code false} otherwise. + */ + public boolean hasPartitionsWithRFGreaterThanNumRacks() { + return _hasPartitionsWithRFGreaterThanNumRacks; + } + + /** + * Retrieve the {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the goal violation detector + * is ready to check for an anomaly. + * + *
    + *
  • Skips detection if cluster model generation has not changed since the last goal violation check.
  • + *
  • In case the cluster has offline replicas, this function skips goal violation check and calls + * {@link #setBalancednessWithOfflineReplicas}.
  • + *
  • See {@link AnomalyDetectionStatus} for details.
  • + *
+ * + * @return The {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the anomaly detector is ready. + */ + protected AnomalyDetectionStatus getGoalViolationDetectionStatus() { + if (_kafkaCruiseControl.loadMonitor().clusterModelGeneration().equals(_lastCheckedModelGeneration)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", + _kafkaCruiseControl.loadMonitor().clusterModelGeneration()); + } + return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED; + } + + AnomalyDetectionStatus detectionStatus = getAnomalyDetectionStatus(_kafkaCruiseControl, true, true); + if (detectionStatus == AnomalyDetectionStatus.SKIP_HAS_OFFLINE_REPLICAS) { + setBalancednessWithOfflineReplicas(); + } else if (detectionStatus == AnomalyDetectionStatus.SKIP_EXECUTOR_NOT_READY) { + // An ongoing execution might indicate a cluster expansion/shrinking. Hence, the detector avoids reporting a stale provision status. + _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); + // An ongoing execution may modify the replication factor of partitions; hence, the detector avoids reporting potential false positives. + _hasPartitionsWithRFGreaterThanNumRacks = false; + } + + return detectionStatus; + } + + /** + * @param clusterModel The state of the cluster. + * @return {@code true} to skip goal violation detection due to offline replicas in the cluster model. + */ + protected boolean skipDueToOfflineReplicas(ClusterModel clusterModel) { + if (!clusterModel.deadBrokers().isEmpty()) { + LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure " + + "detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers()); + setBalancednessWithOfflineReplicas(); + return true; + } else if (!clusterModel.brokersWithBadDisks().isEmpty()) { + LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure " + + "detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks()); + setBalancednessWithOfflineReplicas(); + return true; + } + + return false; + } + + protected void setBalancednessWithOfflineReplicas() { + _balancednessScore = BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS; + _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); + } + + protected void refreshBalancednessScore(Map> violatedGoalsByFixability) { + _balancednessScore = MAX_BALANCEDNESS_SCORE; + for (List violatedGoals : violatedGoalsByFixability.values()) { + violatedGoals.forEach(violatedGoal -> _balancednessScore -= _balancednessCostByGoal.get(violatedGoal)); + } + } + + protected Set excludedTopics(ClusterModel clusterModel) { + return Utils.getTopicNamesMatchedWithPattern(_excludedTopics, clusterModel::topics); + } + + protected boolean optimizeForGoal(ClusterModel clusterModel, + Goal goal, + GoalViolations goalViolations, + Set excludedBrokersForLeadership, + Set excludedBrokersForReplicaMove, + boolean checkPartitionsWithRFGreaterThanNumRacks) + throws KafkaCruiseControlException { + if (clusterModel.topics().isEmpty()) { + LOG.info("Skipping goal violation detection because the cluster model does not have any topic."); + return false; + } + + Map> initReplicaDistribution = clusterModel.getReplicaDistribution(); + Map initLeaderDistribution = clusterModel.getLeaderDistribution(); + try { + OptimizationOptions options = _optimizationOptionsGenerator.optimizationOptionsForGoalViolationDetection(clusterModel, + excludedTopics(clusterModel), + excludedBrokersForLeadership, + excludedBrokersForReplicaMove); + if (checkPartitionsWithRFGreaterThanNumRacks) { + _hasPartitionsWithRFGreaterThanNumRacks = clusterModel.maxReplicationFactor() > clusterModel.numAliveRacksAllowedReplicaMoves(options); + } + goal.optimize(clusterModel, Collections.emptySet(), options); + } catch (OptimizationFailureException ofe) { + // An OptimizationFailureException indicates (1) a hard goal violation that cannot be fixed typically due to + // lack of physical hardware (e.g. insufficient number of racks to satisfy rack awareness, insufficient number + // of brokers to satisfy Replica Capacity Goal, or insufficient number of resources to satisfy resource + // capacity goals), or (2) a failure to move offline replicas away from dead brokers/disks. + goalViolations.addViolation(goal.name(), false); + return true; + } + boolean hasDiff = AnalyzerUtils.hasDiff(initReplicaDistribution, initLeaderDistribution, clusterModel); + LOG.trace("{} generated {} proposals", goal.name(), hasDiff ? "some" : "no"); + if (hasDiff) { + // A goal violation that can be optimized by applying the generated proposals. + goalViolations.addViolation(goal.name(), true); + return true; + } else { + // The goal is already satisfied. + return false; + } + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java index 14472064fd..2ee5f9fb12 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java @@ -4,154 +4,40 @@ package com.linkedin.kafka.cruisecontrol.detector; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; -import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse; -import com.linkedin.kafka.cruisecontrol.common.Utils; import com.linkedin.cruisecontrol.detector.Anomaly; import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; -import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse; import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; -import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress; -import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; -import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils; import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal; -import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig; +import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress; import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; -import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; import com.linkedin.kafka.cruisecontrol.executor.ExecutorState; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; -import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo; -import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.regex.Pattern; -import org.apache.kafka.common.TopicPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ADMIN_CLIENT_CONFIG; -import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ANOMALY_DETECTOR_SENSOR; -import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.balancednessCostByGoal; -import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.MAX_BALANCEDNESS_SCORE; import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG; -import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.getAnomalyDetectionStatus; import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; -import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG; /** * This class will be scheduled to run periodically to check if the given goals are violated or not. An alert will be * triggered if one of the goals is not met. */ -public class GoalViolationDetector extends AbstractAnomalyDetector implements Runnable { +public class GoalViolationDetector extends BaseGoalViolationDetector implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(GoalViolationDetector.class); - private final List _detectionGoals; - private ModelGeneration _lastCheckedModelGeneration; - private final Pattern _excludedTopics; - private final boolean _allowCapacityEstimation; - private final boolean _excludeRecentlyDemotedBrokers; - private final boolean _excludeRecentlyRemovedBrokers; - private final Map _balancednessCostByGoal; - private volatile double _balancednessScore; - private volatile ProvisionResponse _provisionResponse; - private volatile boolean _hasPartitionsWithRFGreaterThanNumRacks; - private final OptimizationOptionsGenerator _optimizationOptionsGenerator; - private final Timer _goalViolationDetectionTimer; - private final Meter _automatedRightsizingMeter; - protected static final double BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS = -1.0; - protected final Provisioner _provisioner; - protected final Boolean _isProvisionerEnabled; public GoalViolationDetector(Queue anomalies, KafkaCruiseControl kafkaCruiseControl, MetricRegistry dropwizardMetricRegistry) { - super(anomalies, kafkaCruiseControl); - KafkaCruiseControlConfig config = _kafkaCruiseControl.config(); - // Notice that we use a separate set of Goal instances for anomaly detector to avoid interference. - _detectionGoals = config.getConfiguredInstances(AnomalyDetectorConfig.ANOMALY_DETECTION_GOALS_CONFIG, Goal.class); - _excludedTopics = Pattern.compile(config.getString(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG)); - _allowCapacityEstimation = config.getBoolean(AnomalyDetectorConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG); - _excludeRecentlyDemotedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG); - _excludeRecentlyRemovedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG); - _balancednessCostByGoal = balancednessCostByGoal(_detectionGoals, - config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG), - config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG)); - _balancednessScore = MAX_BALANCEDNESS_SCORE; - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - _hasPartitionsWithRFGreaterThanNumRacks = false; - Map overrideConfigs = Map.of(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config, - ADMIN_CLIENT_CONFIG, _kafkaCruiseControl.adminClient()); - _optimizationOptionsGenerator = config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, - OptimizationOptionsGenerator.class, - overrideConfigs); - _goalViolationDetectionTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, - "goal-violation-detection-timer")); - _automatedRightsizingMeter = dropwizardMetricRegistry.meter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, "automated-rightsizing-rate")); - _provisioner = kafkaCruiseControl.provisioner(); - _isProvisionerEnabled = config.getBoolean(AnomalyDetectorConfig.PROVISIONER_ENABLE_CONFIG); - } - - /** - * @return A metric to quantify how well the load distribution on a cluster satisfies the {@link #_detectionGoals}. - */ - public double balancednessScore() { - return _balancednessScore; - } - - /** - * @return Provision status of the cluster based on the latest goal violation check. - */ - public ProvisionStatus provisionStatus() { - return _provisionResponse.status(); - } - - /** - * @return {@code true} if the goal violation detector identified partitions with a replication factor (RF) greater than the number of - * racks that contain brokers that are eligible to host replicas (i.e. not excluded for replica moves), {@code false} otherwise. - */ - public boolean hasPartitionsWithRFGreaterThanNumRacks() { - return _hasPartitionsWithRFGreaterThanNumRacks; - } - - /** - * Retrieve the {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the goal violation detector - * is ready to check for an anomaly. - * - *
    - *
  • Skips detection if cluster model generation has not changed since the last goal violation check.
  • - *
  • In case the cluster has offline replicas, this function skips goal violation check and calls - * {@link #setBalancednessWithOfflineReplicas}.
  • - *
  • See {@link AnomalyDetectionStatus} for details.
  • - *
- * - * @return The {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the anomaly detector is ready. - */ - protected AnomalyDetectionStatus getGoalViolationDetectionStatus() { - if (_kafkaCruiseControl.loadMonitor().clusterModelGeneration().equals(_lastCheckedModelGeneration)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", - _kafkaCruiseControl.loadMonitor().clusterModelGeneration()); - } - return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED; - } - - AnomalyDetectionStatus detectionStatus = getAnomalyDetectionStatus(_kafkaCruiseControl, true, true); - if (detectionStatus == AnomalyDetectionStatus.SKIP_HAS_OFFLINE_REPLICAS) { - setBalancednessWithOfflineReplicas(); - } else if (detectionStatus == AnomalyDetectionStatus.SKIP_EXECUTOR_NOT_READY) { - // An ongoing execution might indicate a cluster expansion/shrinking. Hence, the detector avoids reporting a stale provision status. - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - // An ongoing execution may modify the replication factor of partitions; hence, the detector avoids reporting potential false positives. - _hasPartitionsWithRFGreaterThanNumRacks = false; - } - - return detectionStatus; + super(anomalies, kafkaCruiseControl, dropwizardMetricRegistry); } @Override @@ -163,10 +49,11 @@ public void run() { AutoCloseable clusterModelSemaphore = null; try { Map parameterConfigOverrides = Map.of(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl, - ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs()); + ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs()); GoalViolations goalViolations = _kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.GOAL_VIOLATIONS_CLASS_CONFIG, - GoalViolations.class, - parameterConfigOverrides); + GoalViolations.class, + parameterConfigOverrides); + boolean newModelNeeded = true; ClusterModel clusterModel = null; @@ -177,10 +64,10 @@ public void run() { } Set excludedBrokersForLeadership = _excludeRecentlyDemotedBrokers ? executorState.recentlyDemotedBrokers() - : Collections.emptySet(); + : Collections.emptySet(); Set excludedBrokersForReplicaMove = _excludeRecentlyRemovedBrokers ? executorState.recentlyRemovedBrokers() - : Collections.emptySet(); + : Collections.emptySet(); ProvisionResponse provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); boolean checkPartitionsWithRFGreaterThanNumRacks = true; @@ -198,8 +85,8 @@ public void run() { // Make cluster model null before generating a new cluster model so the current one can be GCed. clusterModel = null; clusterModel = _kafkaCruiseControl.clusterModel(goal.clusterModelCompletenessRequirements(), - _allowCapacityEstimation, - new OperationProgress()); + _allowCapacityEstimation, + new OperationProgress()); // If the clusterModel contains dead brokers or disks, goal violation detector will ignore any goal violations. // Detection and fix for dead brokers/disks is the responsibility of broker/disk failure detector. @@ -209,7 +96,7 @@ public void run() { _lastCheckedModelGeneration = clusterModel.generation(); } newModelNeeded = optimizeForGoal(clusterModel, goal, goalViolations, excludedBrokersForLeadership, excludedBrokersForReplicaMove, - checkPartitionsWithRFGreaterThanNumRacks); + checkPartitionsWithRFGreaterThanNumRacks); // CC will check for partitions with RF greater than number of eligible racks just once, because regardless of the goal, the cluster // will have the same (1) maximum replication factor and (2) rack count containing brokers that are eligible to host replicas. checkPartitionsWithRFGreaterThanNumRacks = false; @@ -254,81 +141,4 @@ public void run() { } } - /** - * @param clusterModel The state of the cluster. - * @return {@code true} to skip goal violation detection due to offline replicas in the cluster model. - */ - protected boolean skipDueToOfflineReplicas(ClusterModel clusterModel) { - if (!clusterModel.deadBrokers().isEmpty()) { - LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure " - + "detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers()); - setBalancednessWithOfflineReplicas(); - return true; - } else if (!clusterModel.brokersWithBadDisks().isEmpty()) { - LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure " - + "detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks()); - setBalancednessWithOfflineReplicas(); - return true; - } - - return false; - } - - protected void setBalancednessWithOfflineReplicas() { - _balancednessScore = BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS; - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - } - - protected void refreshBalancednessScore(Map> violatedGoalsByFixability) { - _balancednessScore = MAX_BALANCEDNESS_SCORE; - for (List violatedGoals : violatedGoalsByFixability.values()) { - violatedGoals.forEach(violatedGoal -> _balancednessScore -= _balancednessCostByGoal.get(violatedGoal)); - } - } - - protected Set excludedTopics(ClusterModel clusterModel) { - return Utils.getTopicNamesMatchedWithPattern(_excludedTopics, clusterModel::topics); - } - - protected boolean optimizeForGoal(ClusterModel clusterModel, - Goal goal, - GoalViolations goalViolations, - Set excludedBrokersForLeadership, - Set excludedBrokersForReplicaMove, - boolean checkPartitionsWithRFGreaterThanNumRacks) - throws KafkaCruiseControlException { - if (clusterModel.topics().isEmpty()) { - LOG.info("Skipping goal violation detection because the cluster model does not have any topic."); - return false; - } - Map> initReplicaDistribution = clusterModel.getReplicaDistribution(); - Map initLeaderDistribution = clusterModel.getLeaderDistribution(); - try { - OptimizationOptions options = _optimizationOptionsGenerator.optimizationOptionsForGoalViolationDetection(clusterModel, - excludedTopics(clusterModel), - excludedBrokersForLeadership, - excludedBrokersForReplicaMove); - if (checkPartitionsWithRFGreaterThanNumRacks) { - _hasPartitionsWithRFGreaterThanNumRacks = clusterModel.maxReplicationFactor() > clusterModel.numAliveRacksAllowedReplicaMoves(options); - } - goal.optimize(clusterModel, Collections.emptySet(), options); - } catch (OptimizationFailureException ofe) { - // An OptimizationFailureException indicates (1) a hard goal violation that cannot be fixed typically due to - // lack of physical hardware (e.g. insufficient number of racks to satisfy rack awareness, insufficient number - // of brokers to satisfy Replica Capacity Goal, or insufficient number of resources to satisfy resource - // capacity goals), or (2) a failure to move offline replicas away from dead brokers/disks. - goalViolations.addViolation(goal.name(), false); - return true; - } - boolean hasDiff = AnalyzerUtils.hasDiff(initReplicaDistribution, initLeaderDistribution, clusterModel); - LOG.trace("{} generated {} proposals", goal.name(), hasDiff ? "some" : "no"); - if (hasDiff) { - // A goal violation that can be optimized by applying the generated proposals. - goalViolations.addViolation(goal.name(), true); - return true; - } else { - // The goal is already satisfied. - return false; - } - } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/IntraBrokerGoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/IntraBrokerGoalViolationDetector.java index 90b4bedc46..7870109e8b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/IntraBrokerGoalViolationDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/IntraBrokerGoalViolationDetector.java @@ -11,130 +11,49 @@ import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils; import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; -import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator; import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse; import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal; import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress; -import com.linkedin.kafka.cruisecontrol.common.Utils; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; -import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig; import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; -import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal; import com.linkedin.kafka.cruisecontrol.executor.ExecutorState; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo; -import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.regex.Pattern; -import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.*; -import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.*; -import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ANOMALY_DETECTOR_SENSOR; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; /** * This class will be scheduled to run periodically to check if the given goals are violated or not. An alert will be * triggered if one of the goals is not met. */ -public class IntraBrokerGoalViolationDetector extends AbstractAnomalyDetector implements Runnable { +public class IntraBrokerGoalViolationDetector extends BaseGoalViolationDetector implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerGoalViolationDetector.class); private final List _detectionGoals; - private ModelGeneration _lastCheckedModelGeneration; - private final Pattern _excludedTopics; - private final boolean _allowCapacityEstimation; - private final boolean _excludeRecentlyDemotedBrokers; - private final boolean _excludeRecentlyRemovedBrokers; - private final Map _balancednessCostByGoal; - private volatile double _balancednessScore; - private volatile ProvisionResponse _provisionResponse; - private final OptimizationOptionsGenerator _optimizationOptionsGenerator; private final Timer _goalViolationDetectionTimer; - protected static final double BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS = -1.0; - protected final Provisioner _provisioner; - protected final Boolean _isProvisionerEnabled; + private final boolean _populateReplicaInfo = true; public IntraBrokerGoalViolationDetector(Queue anomalies, KafkaCruiseControl kafkaCruiseControl, MetricRegistry dropwizardMetricRegistry) { - super(anomalies, kafkaCruiseControl); + super(anomalies, kafkaCruiseControl, dropwizardMetricRegistry); KafkaCruiseControlConfig config = _kafkaCruiseControl.config(); // Notice that we use a separate set of Goal instances for anomaly detector to avoid interference. _detectionGoals = config.getConfiguredInstances(AnomalyDetectorConfig.ANOMALY_DETECTION_INTRA_BROKER_GOALS_CONFIG, Goal.class); - _excludedTopics = Pattern.compile(config.getString(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG)); - _allowCapacityEstimation = config.getBoolean(AnomalyDetectorConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG); - _excludeRecentlyDemotedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG); - _excludeRecentlyRemovedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG); - _balancednessCostByGoal = balancednessCostByGoal(_detectionGoals, - config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG), - config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG)); - _balancednessScore = MAX_BALANCEDNESS_SCORE; - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - Map overrideConfigs = new HashMap<>(2); - overrideConfigs.put(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config); - overrideConfigs.put(ADMIN_CLIENT_CONFIG, _kafkaCruiseControl.adminClient()); - _optimizationOptionsGenerator = config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, - OptimizationOptionsGenerator.class, - overrideConfigs); _goalViolationDetectionTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, "intra-broker-goal-violation-detection-timer")); - _provisioner = kafkaCruiseControl.provisioner(); - _isProvisionerEnabled = config.getBoolean(AnomalyDetectorConfig.PROVISIONER_ENABLE_CONFIG); - } - - /** - * @return A metric to quantify how well the load distribution on a cluster satisfies the {@link #_detectionGoals}. - */ - public double balancednessScore() { - return _balancednessScore; - } - /** - * @return Provision status of the cluster based on the latest goal violation check. - */ - public ProvisionStatus provisionStatus() { - return _provisionResponse.status(); - } - - /** - * Retrieve the {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the goal violation detector - * is ready to check for an anomaly. - * - *
    - *
  • Skips detection if cluster model generation has not changed since the last goal violation check.
  • - *
  • In case the cluster has offline replicas, this function skips goal violation check and calls - * {@link #setBalancednessWithOfflineReplicas}.
  • - *
  • See {@link AnomalyDetectionStatus} for details.
  • - *
- * - * @return The {@link AnomalyDetectionStatus anomaly detection status}, indicating whether the anomaly detector is ready. - */ - protected AnomalyDetectionStatus getGoalViolationDetectionStatus() { - if (_kafkaCruiseControl.loadMonitor().clusterModelGeneration().equals(_lastCheckedModelGeneration)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", - _kafkaCruiseControl.loadMonitor().clusterModelGeneration()); - } - return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED; - } - - AnomalyDetectionStatus detectionStatus = getAnomalyDetectionStatus(_kafkaCruiseControl, true, true); - if (detectionStatus == AnomalyDetectionStatus.SKIP_HAS_OFFLINE_REPLICAS) { - setBalancednessWithOfflineReplicas(); - } else if (detectionStatus == AnomalyDetectionStatus.SKIP_EXECUTOR_NOT_READY) { - // An ongoing execution might indicate a cluster expansion/shrinking. Hence, the detector avoids reporting a stale provision status. - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - } - - return detectionStatus; } @Override @@ -145,13 +64,13 @@ public void run() { AutoCloseable clusterModelSemaphore = null; try { - Map parameterConfigOverrides = new HashMap<>(2); - parameterConfigOverrides.put(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl); - parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs()); - IntraBrokerGoalViolations goalViolations = _kafkaCruiseControl.config(). - getConfiguredInstance(AnomalyDetectorConfig.INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG, - IntraBrokerGoalViolations.class, - parameterConfigOverrides); + Map parameterConfigOverrides = Map.of(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl, + ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, _kafkaCruiseControl.timeMs()); + IntraBrokerGoalViolations goalViolations = _kafkaCruiseControl.config().getConfiguredInstance( + AnomalyDetectorConfig.INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG, + IntraBrokerGoalViolations.class, + parameterConfigOverrides); + boolean newModelNeeded = true; ClusterModel clusterModel = null; @@ -168,6 +87,7 @@ public void run() { : Collections.emptySet(); ProvisionResponse provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); + boolean checkPartitionsWithRFGreaterThanNumRacks = true; final Timer.Context ctx = _goalViolationDetectionTimer.time(); try { for (Goal goal : _detectionGoals) { @@ -193,8 +113,11 @@ public void run() { } _lastCheckedModelGeneration = clusterModel.generation(); } - newModelNeeded = optimizeForGoal(clusterModel, goal, goalViolations, excludedBrokersForLeadership, excludedBrokersForReplicaMove); - + newModelNeeded = optimizeForGoal(clusterModel, goal, goalViolations, excludedBrokersForLeadership, excludedBrokersForReplicaMove, + checkPartitionsWithRFGreaterThanNumRacks); + // CC will check for partitions with RF greater than number of eligible racks just once, because regardless of the goal, the cluster + // will have the same (1) maximum replication factor and (2) rack count containing brokers that are eligible to host replicas. + checkPartitionsWithRFGreaterThanNumRacks = false; } else { LOG.warn("Skipping goal violation detection for {} because load completeness requirement is not met.", goal); } @@ -209,6 +132,7 @@ public void run() { ProvisionerState provisionerState = _provisioner.rightsize(_provisionResponse.recommendationByRecommender(), new RightsizeOptions()); if (provisionerState != null) { LOG.info("Provisioner state: {}.", provisionerState); + _automatedRightsizingMeter.mark(); } } Map> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability(); @@ -235,52 +159,18 @@ public void run() { } } - /** - * @param clusterModel The state of the cluster. - * @return True to skip goal violation detection due to offline replicas in the cluster model. - */ - protected boolean skipDueToOfflineReplicas(ClusterModel clusterModel) { - if (!clusterModel.deadBrokers().isEmpty()) { - LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure " - + "detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers()); - setBalancednessWithOfflineReplicas(); - return true; - } else if (!clusterModel.brokersWithBadDisks().isEmpty()) { - LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure " - + "detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks()); - setBalancednessWithOfflineReplicas(); - return true; - } - - return false; - } - - protected void setBalancednessWithOfflineReplicas() { - _balancednessScore = BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS; - _provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED); - } - - protected void refreshBalancednessScore(Map> violatedGoalsByFixability) { - _balancednessScore = MAX_BALANCEDNESS_SCORE; - for (List violatedGoals : violatedGoalsByFixability.values()) { - violatedGoals.forEach(violatedGoal -> _balancednessScore -= _balancednessCostByGoal.get(violatedGoal)); - } - } - - protected Set excludedTopics(ClusterModel clusterModel) { - return Utils.getTopicNamesMatchedWithPattern(_excludedTopics, clusterModel::topics); - } - protected boolean optimizeForGoal(ClusterModel clusterModel, Goal goal, IntraBrokerGoalViolations goalViolations, Set excludedBrokersForLeadership, - Set excludedBrokersForReplicaMove) + Set excludedBrokersForReplicaMove, + boolean checkPartitionsWithRFGreaterThanNumRacks) throws KafkaCruiseControlException { if (clusterModel.topics().isEmpty()) { LOG.info("Skipping goal violation detection because the cluster model does not have any topic."); return false; } + Map> initReplicaDistribution = clusterModel.getReplicaDistribution(); Map initLeaderDistribution = clusterModel.getLeaderDistribution(); try { @@ -288,7 +178,9 @@ protected boolean optimizeForGoal(ClusterModel clusterModel, excludedTopics(clusterModel), excludedBrokersForLeadership, excludedBrokersForReplicaMove); - + if (checkPartitionsWithRFGreaterThanNumRacks) { + _hasPartitionsWithRFGreaterThanNumRacks = clusterModel.maxReplicationFactor() > clusterModel.numAliveRacksAllowedReplicaMoves(options); + } goal.optimize(clusterModel, Collections.emptySet(), options); } catch (OptimizationFailureException ofe) { // An OptimizationFailureException indicates (1) a hard goal violation that cannot be fixed typically due to @@ -298,9 +190,9 @@ protected boolean optimizeForGoal(ClusterModel clusterModel, goalViolations.addViolation(goal.name(), false); return true; } - Set proposals = AnalyzerUtils.getDiff(initReplicaDistribution, initLeaderDistribution, clusterModel); - LOG.trace("{} generated {} proposals", goal.name(), proposals.size()); - if (!proposals.isEmpty()) { + boolean hasDiff = AnalyzerUtils.hasDiff(initReplicaDistribution, initLeaderDistribution, clusterModel); + LOG.trace("{} generated {} proposals", goal.name(), hasDiff ? "some" : "no"); + if (hasDiff) { // A goal violation that can be optimized by applying the generated proposals. goalViolations.addViolation(goal.name(), true); return true; @@ -309,4 +201,5 @@ protected boolean optimizeForGoal(ClusterModel clusterModel, return false; } } + }