From b83e70b2996b027d1be2686c7257ca9828a72619 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 9 Dec 2025 17:52:06 +0100 Subject: [PATCH 01/21] wip: refactoring --- .../hadoop/StackableTopologyProvider.java | 818 ++++++++---------- .../tech/stackable/hadoop/TopologyCache.java | 88 ++ .../tech/stackable/hadoop/TopologyLabel.java | 116 +++ .../tech/stackable/hadoop/TopologyUtils.java | 21 + 4 files changed, 576 insertions(+), 467 deletions(-) create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 4d1e917..b6f7877 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -1,7 +1,5 @@ package tech.stackable.hadoop; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; @@ -9,8 +7,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.net.DNSToSwitchMapping; import org.slf4j.Logger; @@ -18,330 +14,257 @@ /** * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a - * topology out of datanodes. + * topology out of dataNodes. * *

This class is intended to be run as part of the NameNode process and will be used by the - * namenode to retrieve topology strings for datanodes. + * nameNode to retrieve topology strings for dataNodes. */ public class StackableTopologyProvider implements DNSToSwitchMapping { + private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); - public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + // Environment variable names public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; - public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + + // Default values public static final String DEFAULT_RACK = "/defaultRack"; - private static final int MAX_LEVELS_DEFAULT = 2; private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; - private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); + private final KubernetesClient client; - private final Cache topologyKeyCache = - Caffeine.newBuilder().expireAfterWrite(getCacheExpiration(), TimeUnit.SECONDS).build(); - private final Cache nodeKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - - private final Cache listenerKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - private final Cache podKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - // The list of labels that this provider uses to generate the topology information for any given - // datanode private final List labels; + // Caching layers + private final TopologyCache cache; + public StackableTopologyProvider() { this.client = new KubernetesClientBuilder().build(); + this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); + this.labels = TopologyLabel.initializeTopologyLabels(); - // Read the labels to be used to build a topology from environment variables. Labels are - // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form - // "[node|pod]:" and separated by ";". So a valid configuration that reads topology - // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node - // that is running a datanode pod would look like this: - // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on - // the number of labels that are processed, because this is what Hadoop traditionally allows - - // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". - String topologyConfig = System.getenv(VARNAME_LABELS); - if (topologyConfig != null && !topologyConfig.isEmpty()) { - String[] labelConfigs = topologyConfig.split(";"); - if (labelConfigs.length > getMaxLabels()) { - LOG.error( - "Found [{}] topology labels configured, but maximum allowed number is [{}]: " - + "please check your config or raise the number of allowed labels.", - labelConfigs.length, - getMaxLabels()); - throw new RuntimeException(); - } - // Create TopologyLabels from config strings - this.labels = - Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); - - // Check if any labelConfigs were invalid - if (this.labels.stream().anyMatch(label -> label.labelType == LabelType.Undefined)) { - LOG.error( - "Topologylabel contained invalid configuration for at least one label: " - + "double check your config! Labels should be specified in the " - + "format '[pod|node]:;...'"); - throw new RuntimeException(); - } + logInitializationStatus(); + } - } else { - LOG.error( - "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); - throw new RuntimeException(); - } - if (this.labels.isEmpty()) { - LOG.info( - "No topology config found, defaulting value for all datanodes to [{}]", DEFAULT_RACK); - } else { - LOG.info( - "Topology config yields labels [{}]", - this.labels.stream().map(label -> label.name).collect(Collectors.toList())); - } + @Override + public void reloadCachedMappings() { + // TODO: According to the upstream comment we should rebuild all cache entries after + // invalidating them + // this may mean trying to resolve ip addresses that do not exist any more and things like that + // though and + // require some more thought, so we will for now just invalidate the cache. + this.cache.invalidateAll(); } - /*** - * Checks if a value for the maximum number of topology levels to allow has been configured in - * the environment variable specified in VARNAME_MAXLEVELS, - * returns the value of MAX_LEVELS_DEFAULT as default if nothing is set. - * - * @return The maximum number of topology levels to allow. - */ - private int getMaxLabels() { - String maxLevelsConfig = System.getenv(VARNAME_MAXLEVELS); - if (maxLevelsConfig != null && !maxLevelsConfig.isEmpty()) { - try { - int maxLevelsFromEnvVar = Integer.parseInt(maxLevelsConfig); - LOG.info( - "Found [{}] env var, changing allowed number of topology levels to [{}]", - VARNAME_MAXLEVELS, - maxLevelsFromEnvVar); - return maxLevelsFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_MAXLEVELS, - maxLevelsConfig, - MAX_LEVELS_DEFAULT); - } - } - return MAX_LEVELS_DEFAULT; + @Override + public void reloadCachedMappings(List names) { + // TODO: See comment above, the same applies here + cache.invalidateTopologyKeys(names); } - /*** - * Checks if a value for the cache expiration time has been configured in - * the environment variable specified in VARNAME_CACHE_EXPIRATION, - * returns the value of CACHE_EXPIRY_DEFAULT_SECONDS as default if nothing is set. - * - * @return The cache expiration time to use for the rack id cache. - */ private int getCacheExpiration() { - String cacheExpirationConfigSeconds = System.getenv(VARNAME_CACHE_EXPIRATION); - if (cacheExpirationConfigSeconds != null && !cacheExpirationConfigSeconds.isEmpty()) { - try { - int cacheExpirationFromEnvVar = Integer.parseInt(cacheExpirationConfigSeconds); - LOG.info( - "Found [{}] env var, changing cache time for topology entries to [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationFromEnvVar); - return cacheExpirationFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationConfigSeconds, - CACHE_EXPIRY_DEFAULT_SECONDS); - } - } - return CACHE_EXPIRY_DEFAULT_SECONDS; + return TopologyUtils.parseIntFromEnv( + VARNAME_CACHE_EXPIRATION, CACHE_EXPIRY_DEFAULT_SECONDS, "cache expiration seconds"); } - /*** - * - * @param datanode the datanode whose IP mis to be resolved - * @param podLabels the pod labels used in the resolution - * @param nodeLabels the node labels used in the resolution - * - * @return the label looked up from the IP address - */ - private String getLabel( - String datanode, - Map> podLabels, - Map> nodeLabels) { - // The internal structures used by this mapper are all based on IP addresses. Depending on - // configuration and network setup it may (probably will) be possible that the namenode uses - // hostnames to resolve a datanode to a topology zone. To allow this, we resolve every input to - // an ip address below and use the ip to lookup labels. - // TODO: this might break with the listener operator, as `pod.status.podips` won't contain - // external addresses - // tracking this in https://github.com/stackabletech/hdfs-topology-provider/issues/2 - InetAddress address; - try { - address = InetAddress.getByName(datanode); - LOG.debug("Resolved [{}] to [{}]", datanode, address.getHostAddress()); - datanode = address.getHostAddress(); - } catch (UnknownHostException e) { - LOG.warn( - "failed to resolve address for [{}] - this should not happen, " - + "defaulting this node to [{}]", - datanode, - DEFAULT_RACK); - return DEFAULT_RACK; - } - StringBuilder resultBuilder = new StringBuilder(); - for (TopologyLabel label : this.labels) { - if (label.labelType == LabelType.Node) { - LOG.debug( - "Looking for node label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - nodeLabels.keySet(), - nodeLabels.values()); - resultBuilder - .append("/") - .append( - nodeLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } else if (label.labelType == LabelType.Pod) { - LOG.debug( - "Looking for pod label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - podLabels.keySet(), - podLabels.values()); - resultBuilder - .append("/") - .append( - podLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } + private void logInitializationStatus() { + if (labels.isEmpty()) { + LOG.info("No topology configuration - will use default rack: {}", DEFAULT_RACK); + } else { + List labelNames = + labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); + LOG.info("Initialized with topology labels: {}", labelNames); } - String result = resultBuilder.toString(); - LOG.debug("Returning label [{}]", result); - return result; } @Override public List resolve(List names) { - LOG.info("Resolving for listeners/client-pods [{}]", names.toString()); + LOG.info("Resolving topology for: {}", names); - if (this.labels.isEmpty()) { - LOG.info( - "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); - return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + if (labels.isEmpty()) { + return createDefaultRackList(names); } - // We need to check if we have cached values for all datanodes contained in this request. + // Try to serve from cache first + List cachedValues = tryResolveFromCache(names); + if (cachedValues != null) { + LOG.info("Returning cached topology: {}", cachedValues); + return cachedValues; + } + + // Cache miss - perform full resolution + return performFullResolution(names); + } + + private List createDefaultRackList(List names) { + LOG.info( + "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); + return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + } + + private List tryResolveFromCache(List names) { + // We need to check if we have cached values for all dataNodes contained in this request. // Unless we can answer everything from the cache we have to talk to k8s anyway and can just // recalculate everything - List cachedValues = - names.stream().map(this.topologyKeyCache::getIfPresent).collect(Collectors.toList()); - LOG.debug("Cached topologyKeyCache values [{}]", cachedValues); - - if (cachedValues.contains(null)) { - // We cannot completely serve this request from the cache, since we need to talk to k8s anyway - // we'll simply refresh everything. - LOG.debug( - "Cache doesn't contain values for all requested pods: new values will be built for all nodes."); - } else { - // use same log level as the non-cached return statement - LOG.info("Answering from cached topology keys: [{}]", cachedValues); - return cachedValues; + List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); + LOG.debug("Cached topologyKeyCache values [{}]", cached); + + return cached.contains(null) ? null : cached; + } + + // ============================================================================ + // RESOLUTION WORKFLOW + // ============================================================================ + + private List performFullResolution(List names) { + LOG.debug("Performing full topology resolution for: {}", names); + + // Step 1: Gather all dataNodes + List dataNodes = fetchDataNodes(); + + // Step 2: Resolve listeners to actual datanode IPs + List resolvedNames = resolveListeners(names); + + // Step 3: Build label lookup maps + Map> podLabels = buildPodLabelMap(dataNodes); + Map> nodeLabels = buildNodeLabelMap(dataNodes); + + // Step 4: Build node-to-datanode map for O(1) colocated lookups + Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); + + // Step 5: Resolve client pods to co-located dataNodes + List datanodeIps = + resolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); + + // Step 6: Build topology strings and cache results + return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); + } + + private List buildAndCacheTopology( + List originalNames, + List datanodeIps, + Map> podLabels, + Map> nodeLabels) { + List result = new ArrayList<>(); + for (int i = 0; i < datanodeIps.size(); i++) { + String datanodeIp = datanodeIps.get(i); + String originalName = originalNames.get(i); + + String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); + result.add(topology); + + // Cache both the resolved IP and original name + cache.putTopology(datanodeIp, topology); + cache.putTopology(originalName, topology); } - // The datanodes will be the cache keys. - List datanodes = + LOG.info("Built topology: {}", result); + return result; + } + + // ============================================================================ + // DATANODE FETCHING + // ============================================================================ + + private List fetchDataNodes() { + List dataNodes = client .pods() .withLabel("app.kubernetes.io/component", "datanode") .withLabel("app.kubernetes.io/name", "hdfs") .list() .getItems(); + LOG.debug( - "Retrieved datanodes: [{}]", - datanodes.stream() - .map(datanode -> datanode.getMetadata().getName()) + "Retrieved dataNodes: [{}]", + dataNodes.stream() + .map(dataNode -> dataNode.getMetadata().getName()) .collect(Collectors.toList())); + return dataNodes; + } - List namesToDataNodeNames = dataNodesResolvedFromListenerOrOriginal(names); - LOG.debug("Now resolving: [{}]", namesToDataNodeNames); + // ============================================================================ + // LISTENER RESOLUTION + // ============================================================================ - // Build internal state that is later used to look up information. Basically this transposes pod - // and node lists into hashmaps where pod-IPs can be used to look up labels for the pods and - // nodes. This is not terribly memory efficient because it effectively duplicates a lot of data - // in memory. But since we cache lookups, this should hopefully only be done every once in a - // while and is not kept in memory for extended amounts of time. - List result = new ArrayList<>(); + private List resolveListeners(List names) { + refreshListenerCacheIfNeeded(names); + + return names.stream().map(this::resolveListenerToDatanode).collect(Collectors.toList()); + } + + private void refreshListenerCacheIfNeeded(List names) { + List missingNames = + names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); - Map> nodeLabels = getNodeLabels(datanodes); - LOG.debug("Resolved node labels map [{}]/[{}]", nodeLabels.keySet(), nodeLabels.values()); + if (missingNames.isEmpty()) { + LOG.debug("Listener cache contains all required entries"); + return; + } - Map> podLabels = getPodLabels(datanodes); - LOG.debug("Resolved pod labels map [{}]/[{}]", podLabels.keySet(), podLabels.values()); + // Listeners are typically few, so fetch all + // (Individual listener fetches would require knowing the namespace) + LOG.debug("Fetching all listeners to populate cache"); + GenericKubernetesResourceList listeners = fetchListeners(); - List podsResolvedToDataNodes = - resolveDataNodesFromCallingPods(namesToDataNodeNames, podLabels, datanodes); + for (GenericKubernetesResource listener : listeners.getItems()) { + cacheListenerByNameAndAddresses(listener); + } + } - // Iterate over all nodes to resolve and return the topology zones - for (int i = 0; i < podsResolvedToDataNodes.size(); i++) { - String builtLabel = getLabel(podsResolvedToDataNodes.get(i), podLabels, nodeLabels); - result.add(builtLabel); + private void cacheListenerByNameAndAddresses(GenericKubernetesResource listener) { + String name = listener.getMetadata().getName(); + cache.putListener(name, listener); - // Cache the value for potential use in a later request - this.topologyKeyCache.put(podsResolvedToDataNodes.get(i), builtLabel); - // also cache the original name, in case that has resolved to a dataNode (so that - // the resolution step can be omitted next time this pod is encountered) - this.topologyKeyCache.put(names.get(i), builtLabel); + // Also cache by ingress addresses for quick lookup + for (String address : TopologyUtils.getIngressAddresses(listener)) { + cache.putListener(address, listener); } - LOG.info("Returning resolved labels [{}]", result); - return result; } /** - * If the names include listeners then these must be resolved against the dataNode IPs and used - * subsequently. + * We don't know if the name refers to a listener (it could be any client pod) but we check to see + * if it can be resolved to a dataNode just in case. * - * @param names the collection of names to resolve - * @return a collection of either the name (for non-listener) or the dataNode IP to which this - * listener resolves + * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a + * listener + * @return either the name (for non-listener) or the dataNode IP to which this listener resolves */ - private List dataNodesResolvedFromListenerOrOriginal(List names) { - List cachedListeners = - names.stream().map(this.listenerKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedListeners.contains(null)) { - LOG.debug( - "Refreshing listener cache as not all of [{}] present in [{}]", - names, - this.listenerKeyCache.asMap().keySet()); - - GenericKubernetesResourceList listeners = getListeners(); - - for (GenericKubernetesResource listener : listeners.getItems()) { - this.listenerKeyCache.put(listener.getMetadata().getName(), listener); - // also add the IPs - for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { - this.listenerKeyCache.put(ingressAddress, listener); - } + private String resolveListenerToDatanode(String name) { + GenericKubernetesResource listener = cache.getListener(name); + if (listener == null) { + LOG.debug("Not a listener: {}", name); + return name; + } + List ingressAddresses = TopologyUtils.getIngressAddresses(listener); + for (String ingressAddress : ingressAddresses) { + LOG.debug("Address [{}]", ingressAddress); + if (name.equalsIgnoreCase(ingressAddress)) { + return resolveListenerEndpoint(listener); } - } else { - LOG.debug("Listener cache contains [{}]", names); } - ConcurrentMap listeners = this.listenerKeyCache.asMap(); + LOG.info("Not a listener, returning [{}]", name); + return name; + } - List listenerToDataNodeNames = new ArrayList<>(); + private String resolveListenerEndpoint(GenericKubernetesResource listener) { + String listenerName = listener.getMetadata().getName(); + Endpoints endpoint = client.endpoints().withName(listenerName).get(); + LOG.debug("Matched ingressAddress [{}]", listenerName); - for (String name : names) { - String resolvedName = resolveDataNodesFromListeners(name, listeners); - listenerToDataNodeNames.add(resolvedName); + if (endpoint.getSubsets().isEmpty()) { + LOG.warn("Endpoint {} has no subsets - pod may be restarting", listenerName); + return listenerName; } - return listenerToDataNodeNames; + + EndpointAddress address = endpoint.getSubsets().get(0).getAddresses().get(0); + LOG.info( + "Resolved listener {} to IP {} on node {}", + listenerName, + address.getIp(), + address.getNodeName()); + + return address.getIp(); } - private GenericKubernetesResourceList getListeners() { + private GenericKubernetesResourceList fetchListeners() { ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") @@ -353,168 +276,174 @@ private GenericKubernetesResourceList getListeners() { return client.genericKubernetesResources(listenerCrd).list(); } - /** - * We don't know if the name refers to a listener (it could be any client pod) but we check to see - * if it can be resolved to a dataNode just in case. - * - * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a - * listener - * @param listeners the current listener collection - * @return either the name (for non-listener) or the dataNode IP to which this listener resolves - */ - private String resolveDataNodesFromListeners( - String name, ConcurrentMap listeners) { - LOG.debug("Attempting to resolve [{}]", name); - for (GenericKubernetesResource listener : listeners.values()) { - List ingressAddresses = TopologyUtils.getIngressAddresses(listener); - for (String ingressAddress : ingressAddresses) { - LOG.debug("Address [{}]", ingressAddress); - if (name.equalsIgnoreCase(ingressAddress)) { - LOG.debug("Matched ingressAddress [{}]/[{}]", name, listener.getMetadata().getName()); - - Endpoints ep = client.endpoints().withName(listener.getMetadata().getName()).get(); - // TODO: Assuming single address per datanode endpoint. - // When does/can an endpoint support multiple data nodes? On restart the address list will - // be empty for a moment or two. - if (ep.getSubsets().size() < 1) { - LOG.warn( - "Endpoint [{}] address not detected, pod maybe restarting...", - ep.getMetadata().getName()); - } else { - EndpointAddress address = ep.getSubsets().get(0).getAddresses().get(0); - LOG.info( - "Endpoint [{}], IP [{}], node [{}]", - ep.getMetadata().getName(), - address.getIp(), - address.getNodeName()); - return address.getIp(); - } - } - } + // ============================================================================ + // CLIENT POD RESOLUTION + // ============================================================================ + + private List resolveClientPodsToDataNodes( + List names, + Map> podLabels, + Map nodeToDatanodeIp) { + + refreshPodCacheIfNeeded(names); + + return names.stream() + .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) + .collect(Collectors.toList()); + } + + private void refreshPodCacheIfNeeded(List names) { + if (cache.hasAllPods(names)) { + LOG.debug("Pod cache contains all required entries"); + return; + } + + // Note: We fetch all pods here because: + // 1. Client pods (Spark, etc.) are queried by IP, not name + // 2. K8s doesn't support "get pod by IP" - we must list and filter + LOG.debug("Refreshing pod cache for client pod resolution"); + for (Pod pod : client.pods().list().getItems()) { + cachePodByNameAndIps(pod); } - LOG.info("Not a listener, returning [{}]", name); - return name; } - /** - * The list of names may be datanodes (as is the case when the topology is initialised) or - * non-datanodes, when the data is being written by a client tool, spark executors etc. In this - * case we want to identify the datanodes that are running on the same node as this client. The - * names may also be pod IPs or pod names. - * - * @param names list of client pods to resolve to datanodes - * @param podLabels map of podIPs and labels - * @param dns list of datanode names which will be used to match nodenames - * @return a list of pods resolved to co-located datanodes where possible - */ - private List resolveDataNodesFromCallingPods( - List names, Map> podLabels, List dns) { - List dataNodes = new ArrayList<>(); - - // if any of the names do not exist in the pod cache then refresh it - List cachedPods = - names.stream().map(this.podKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedPods.contains(null)) { - LOG.info( - "Refreshing pod cache as not all of [{}] present in [{}]", - names, - this.podKeyCache.asMap().keySet()); - for (Pod pod : client.pods().list().getItems()) { - this.podKeyCache.put(pod.getMetadata().getName(), pod); - // also add an entry for each IP - for (PodIP ip : pod.getStatus().getPodIPs()) { - this.podKeyCache.put(ip.getIp(), pod); - } - } - } else { - LOG.info("Pod cache contains [{}]", names); + private void cachePodByNameAndIps(Pod pod) { + String podName = pod.getMetadata().getName(); + cache.putPod(podName, pod); + + // Cache by all IPs - this is crucial for IP-based lookups + for (PodIP ip : pod.getStatus().getPodIPs()) { + cache.putPod(ip.getIp(), pod); + } + } + + private String resolveToDatanodeOrKeep( + String name, + Map> podLabels, + Map nodeToDatanodeIp) { + + String ipAddress = resolveToIpAddress(name); + + // If it's already a datanode, return its IP + if (podLabels.containsKey(ipAddress)) { + LOG.info("Name is a datanode: {}", ipAddress); + return ipAddress; } - ConcurrentMap pods = this.podKeyCache.asMap(); - - for (String name : names) { - // if we don't find a dataNode running on the same node as a non-dataNode pod, then - // we'll keep the original name to allow it to be resolved to NotFound in the calling routine. - String replacementDataNodeIp = name; - InetAddress address; - try { - // make sure we are looking up using the IP address - address = InetAddress.getByName(name); - String podIp = address.getHostAddress(); - if (podLabels.containsKey(podIp)) { - replacementDataNodeIp = podIp; - LOG.info("Added as found in the datanode map [{}]", podIp); - } else { - // we've received a call from a non-datanode pod - for (Pod pod : pods.values()) { - if (pod.getStatus().getPodIPs().contains(new PodIP(podIp))) { - String nodeName = pod.getSpec().getNodeName(); - for (Pod dn : dns) { - if (dn.getSpec().getNodeName().equals(nodeName)) { - LOG.debug( - "NodeName [{}] matches with [{}]?", dn.getSpec().getNodeName(), nodeName); - replacementDataNodeIp = dn.getStatus().getPodIP(); - break; - } - } - } - } - } - } catch (UnknownHostException e) { - LOG.warn("Error encountered while resolving host [{}]", e.getLocalizedMessage()); + + // Try to find co-located datanode + Pod clientPod = cache.getPod(ipAddress); + if (clientPod != null) { + String datanodeIp = findColocatedDatanode(clientPod, nodeToDatanodeIp); + if (datanodeIp != null) { + return datanodeIp; } - dataNodes.add(replacementDataNodeIp); } - LOG.info("Replacing names [{}] with IPs [{}]", names, dataNodes); - return dataNodes; + + // Keep original if we can't resolve + return ipAddress; + } + + private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { + return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); + } + + private String resolveToIpAddress(String hostname) { + try { + InetAddress address = InetAddress.getByName(hostname); + String ip = address.getHostAddress(); + LOG.debug("Resolved {} to {}", hostname, ip); + return ip; + } catch (UnknownHostException e) { + LOG.warn("Failed to resolve address: {} - defaulting to {}", hostname, DEFAULT_RACK); + return hostname; + } } /** - * Given a list of datanodes, return a HashMap that maps pod ips onto Pod labels. The returned Map - * may contain more entries than the list that is given to this function, as an entry will be - * generated for every ip a pod has. + * Build a map from Kubernetes node name to datanode IP. This enables O(1) lookup when finding + * co-located dataNodes for client pods. * - * @param datanodes List of all retrieved pods. - * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + *

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable + * because all dataNodes on the same node have the same topology. */ - private Map> getPodLabels(List datanodes) { - Map> result = new HashMap<>(); - // Iterate over all pods and then all ips for every pod and add these to the mapping - for (Pod pod : datanodes) { - for (PodIP podIp : pod.getStatus().getPodIPs()) { - result.put(podIp.getIp(), pod.getMetadata().getLabels()); + private Map buildNodeToDatanodeMap(List dataNodes) { + Map nodeToDatanode = new HashMap<>(); + + for (Pod dataNode : dataNodes) { + String nodeName = dataNode.getSpec().getNodeName(); + String dataNodeIp = dataNode.getStatus().getPodIP(); + + if (nodeName != null && dataNodeIp != null) { + nodeToDatanode.put(nodeName, dataNodeIp); } } + + LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); + return nodeToDatanode; + } + + // ============================================================================ + // TOPOLOGY STRING BUILDING + // ============================================================================ + + private String buildTopologyString( + String ipAddress, + Map> podLabels, + Map> nodeLabels) { + StringBuilder topology = new StringBuilder(); + + for (TopologyLabel label : labels) { + String labelValue = extractLabelValue(ipAddress, label, podLabels, nodeLabels); + topology.append("/").append(labelValue); + } + + String result = topology.toString(); + LOG.debug("Returning label [{}]", result); return result; } + private String extractLabelValue( + String ipAddress, + TopologyLabel label, + Map> podLabels, + Map> nodeLabels) { + + Map> labelSource = label.isNodeLabel() ? nodeLabels : podLabels; + + String labelValue = + labelSource + .getOrDefault(ipAddress, Collections.emptyMap()) + .getOrDefault(label.getName(), "NotFound"); + + LOG.debug("Label {}.{} = {}", label.getType(), label.getName(), labelValue); + return labelValue; + } + + // ============================================================================ + // LABEL MAPS + // ============================================================================ + /** - * Given a list of datanodes this function will resolve which datanodes run on which node as well - * as all the ips assigned to a datanodes. It will then return a mapping of every ip address to - * the labels that are attached to the "physical" node running the datanodes that this ip belongs + * Given a list of dataNodes this function will resolve which dataNodes run on which node as well + * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to + * the labels that are attached to the "physical" node running the dataNodes that this ip belongs * to. * - * @param datanodes List of all in-scope datanodes (datanode pods in this namespace) + * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) * @return Map of ip addresses to labels of the node running the pod that the ip address belongs * to */ - private Map> getNodeLabels(List datanodes) { + private Map> buildNodeLabelMap(List dataNodes) { Map> result = new HashMap<>(); - for (Pod pod : datanodes) { - // either retrieve the node from the internal cache or fetch it by name + for (Pod pod : dataNodes) { String nodeName = pod.getSpec().getNodeName(); - // nodeName may be null while pod is being provisioned so force a re-try + if (nodeName == null) { - LOG.warn( - "Pod [{}] not yet assigned to a k8s node, forcing re-try", pod.getMetadata().getName()); + LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); return result; } - Node node = this.nodeKeyCache.getIfPresent(nodeName); - if (node == null) { - LOG.debug("Node not yet cached, fetching by name [{}]", nodeName); - node = client.nodes().withName(nodeName).get(); - this.nodeKeyCache.put(nodeName, node); - } + Node node = getOrFetchNode(nodeName); Map nodeLabels = node.getMetadata().getLabels(); LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); @@ -526,78 +455,33 @@ private Map> getNodeLabels(List datanodes) { return result; } - @Override - public void reloadCachedMappings() { - // TODO: According to the upstream comment we should rebuild all cache entries after - // invalidating them - // this may mean trying to resolve ip addresses that do not exist any more and things like that - // though and - // require some more thought, so we will for now just invalidate the cache. - this.topologyKeyCache.invalidateAll(); - } - - @Override - public void reloadCachedMappings(List names) { - // TODO: See comment above, the same applies here - for (String name : names) { - this.topologyKeyCache.invalidate(name); + private Node getOrFetchNode(String nodeName) { + Node node = cache.getNode(nodeName); + if (node == null) { + LOG.debug("Fetching node: {}", nodeName); + node = client.nodes().withName(nodeName).get(); + cache.putNode(nodeName, node); } + return node; } - private enum LabelType { - Node, - Pod, - Undefined - } + /** + * Given a list of dataNodes, return a HashMap that maps pod ips onto Pod labels. The returned Map + * may contain more entries than the list that is given to this function, as an entry will be + * generated for every ip a pod has. + * + * @param dataNodes List of all retrieved pods. + * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + */ + private Map> buildPodLabelMap(List dataNodes) { + Map> result = new HashMap<>(); + for (Pod pod : dataNodes) { + Map podLabels = pod.getMetadata().getLabels(); - private class TopologyLabel { - private final LabelType labelType; - private String name = null; - - /** - * Create a new TopologyLabel from its string representation - * - * @param value A string in the form of "[node|pod]:" that is deserialized into a - * TopologyLabel. Invalid and empty strings are resolved into the type unspecified. - */ - private TopologyLabel(String value) { - // If this is null the env var was not set, we will return 'undefined' for this level - if (value == null || value.isEmpty()) { - this.labelType = LabelType.Undefined; - return; - } - String[] split = value.toLowerCase(Locale.ROOT).split(":", 2); - - // This should only fail if no : was present in the string - if (split.length != 2) { - this.labelType = LabelType.Undefined; - LOG.warn( - "Ignoring topology label [{}] - label definitions need to be in the form " - + "of \"[node|pod]:\"", - value); - return; - } - // Length has to be two, proceed with "normal" case - String type = split[0]; - this.name = split[1]; - - // Parse type of object labels should be retrieved from - switch (type) { - case "node": - this.labelType = LabelType.Node; - break; - - case "pod": - this.labelType = LabelType.Pod; - break; - - default: - LOG.warn( - "Encountered unsupported label type [{}] - this label definition will be ignored, " - + "supported types are [\"node\", \"pod\"]", - type); - this.labelType = LabelType.Undefined; + for (PodIP podIp : pod.getStatus().getPodIPs()) { + result.put(podIp.getIp(), podLabels); } } + return result; } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java new file mode 100644 index 0000000..3944fd4 --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -0,0 +1,88 @@ +package tech.stackable.hadoop; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.Pod; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** Manages all caching layers for the topology provider. */ +public class TopologyCache { + private final Cache topology; + private final Cache nodes; + private final Cache listeners; + private final Cache pods; + + TopologyCache(int expirationSeconds, int defaultExpirationSeconds) { + this.topology = + Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build(); + + this.nodes = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.listeners = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.pods = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + } + + String getTopology(String key) { + return topology.getIfPresent(key); + } + + void putTopology(String key, String value) { + topology.put(key, value); + } + + void invalidateAll() { + topology.invalidateAll(); + } + + void invalidateTopologyKeys(List keys) { + keys.forEach(topology::invalidate); + } + + Node getNode(String name) { + return nodes.getIfPresent(name); + } + + void putNode(String name, Node node) { + nodes.put(name, node); + } + + GenericKubernetesResource getListener(String name) { + return listeners.getIfPresent(name); + } + + ConcurrentMap getListenerMap() { + return listeners.asMap(); + } + + void putListener(String name, GenericKubernetesResource listener) { + listeners.put(name, listener); + } + + boolean hasAllListeners(List names) { + return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); + } + + Pod getPod(String name) { + return pods.getIfPresent(name); + } + + ConcurrentMap getPodMap() { + return pods.asMap(); + } + + void putPod(String name, Pod pod) { + pods.put(name, pod); + } + + boolean hasAllPods(List names) { + return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); + } +} diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java new file mode 100644 index 0000000..ba91b79 --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java @@ -0,0 +1,116 @@ +package tech.stackable.hadoop; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopologyLabel { + private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); + public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + private static final int MAX_LEVELS_DEFAULT = 2; + + public enum Type { + NODE, + POD, + UNDEFINED + } + + private final Type type; + private final String name; + + TopologyLabel(String config) { + if (config == null || config.isEmpty()) { + this.type = Type.UNDEFINED; + this.name = null; + return; + } + + String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); + + if (parts.length != 2) { + LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); + this.type = Type.UNDEFINED; + this.name = null; + return; + } + + this.name = parts[1]; + + switch (parts[0]) { + case "node": + this.type = Type.NODE; + break; + case "pod": + this.type = Type.POD; + break; + default: + LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]); + this.type = Type.UNDEFINED; + } + } + + boolean isNodeLabel() { + return type == Type.NODE; + } + + boolean isUndefined() { + return type == Type.UNDEFINED; + } + + String getName() { + return name; + } + + Type getType() { + return type; + } + + public static List initializeTopologyLabels() { + // Read the labels to be used to build a topology from environment variables. Labels are + // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form + // "[node|pod]:" and separated by ";". So a valid configuration that reads topology + // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node + // that is running a datanode pod would look like this: + // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on + // the number of labels that are processed, because this is what Hadoop traditionally allows - + // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". + String topologyConfig = System.getenv(VARNAME_LABELS); + + if (topologyConfig == null || topologyConfig.isEmpty()) { + LOG.error( + "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); + throw new RuntimeException("TOPOLOGY_LABELS environment variable not set"); + } + + String[] labelConfigs = topologyConfig.split(";"); + + if (labelConfigs.length > getMaxLabels()) { + LOG.error( + "Found [{}] topology labels configured, but maximum allowed number is [{}]: " + + "please check your config or raise the number of allowed labels.", + labelConfigs.length, + getMaxLabels()); + throw new RuntimeException("Too many topology labels configured"); + } + // Create TopologyLabels from config strings + List labels = + Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); + + if (labels.stream().anyMatch(TopologyLabel::isUndefined)) { + LOG.error( + "Invalid topology label configuration - labels must be in format '[pod|node]:'"); + throw new RuntimeException("Invalid topology label configuration"); + } + + return labels; + } + + private static int getMaxLabels() { + return TopologyUtils.parseIntFromEnv( + VARNAME_MAXLEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels"); + } +} diff --git a/src/main/java/tech/stackable/hadoop/TopologyUtils.java b/src/main/java/tech/stackable/hadoop/TopologyUtils.java index 686c4f2..be7cb51 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyUtils.java +++ b/src/main/java/tech/stackable/hadoop/TopologyUtils.java @@ -4,8 +4,12 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TopologyUtils { + private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class); + private static final String ADDRESS = "address"; private static final String STATUS = "status"; private static final String INGRESS_ADDRESSES = "ingressAddresses"; @@ -21,4 +25,21 @@ public static List getIngressAddresses(GenericKubernetesResource listene .map(ingress -> (String) ingress.get(ADDRESS)) .collect(Collectors.toList()); } + + public static int parseIntFromEnv(String varName, int defaultValue, String description) { + String value = System.getenv(varName); + if (value == null || value.isEmpty()) { + return defaultValue; + } + + try { + int parsed = Integer.parseInt(value); + LOG.info("Set {} to {} from environment variable {}", description, parsed, varName); + return parsed; + } catch (NumberFormatException e) { + LOG.warn( + "Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue); + return defaultValue; + } + } } From a3cf6e8a3309aa60cda0ce5910d644277a48f39b Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 16:37:03 +0100 Subject: [PATCH 02/21] refer to class in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f277d20..4c0b399 100644 --- a/README.md +++ b/README.md @@ -701,7 +701,7 @@ In Kubernetes, the most commonly used mechanism for topology awareness are label The most prevalent example for this is the node label [topology.kubernetes.io/zone](https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone) which often refers to availability zones in cloud providers or similar things. The purpose of this tool is to feed information from Kubernetes into the HDFS rack awareness functionality. -In order to do this, it implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. +In order to do this, `tech.stackable.hadoop.StackableTopologyProvider` implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. The topology provider watches all HDFS pods deployed by Stackable and Kubernetes nodes and keeps an in memory cache of the current state of these objects. From this state store the tool can then calculate rack IDs for nodes that HDFS asks for without needing to talk to the api-server and incurring an extra network round-trip. From 201717b357a2202bd8a0f52defe7b7af6e835bc8 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 17:08:44 +0100 Subject: [PATCH 03/21] improved listener resolution --- .../hadoop/StackableTopologyProvider.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index b6f7877..0885796 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -233,15 +233,8 @@ private String resolveListenerToDatanode(String name) { LOG.debug("Not a listener: {}", name); return name; } - List ingressAddresses = TopologyUtils.getIngressAddresses(listener); - for (String ingressAddress : ingressAddresses) { - LOG.debug("Address [{}]", ingressAddress); - if (name.equalsIgnoreCase(ingressAddress)) { - return resolveListenerEndpoint(listener); - } - } - LOG.info("Not a listener, returning [{}]", name); - return name; + // We found a listener, so we can resolve it directly + return resolveListenerEndpoint(listener); } private String resolveListenerEndpoint(GenericKubernetesResource listener) { @@ -344,7 +337,19 @@ private String resolveToDatanodeOrKeep( } private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { - return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); + String clientNodeName = clientPod.getSpec().getNodeName(); + + if (clientNodeName == null) { + LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName()); + return null; + } + + String datanodeIp = nodeToDatanodeIp.get(clientNodeName); + if (datanodeIp == null) { + LOG.debug("No datanode found on node {}", clientNodeName); + } + + return datanodeIp; } private String resolveToIpAddress(String hostname) { From a55e6bed2cd4b772a7ed2239e10417f9e26f4849 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 13:31:42 +0100 Subject: [PATCH 04/21] added logging --- .../java/tech/stackable/hadoop/StackableTopologyProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 0885796..ecc3e71 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -72,6 +72,7 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } + LOG.debug("Client namespaces {} and config {}", client.namespaces(), client.configMaps()); } @Override @@ -302,6 +303,7 @@ private void refreshPodCacheIfNeeded(List names) { private void cachePodByNameAndIps(Pod pod) { String podName = pod.getMetadata().getName(); + LOG.debug("Refreshing pod cache: adding {}", podName); cache.putPod(podName, pod); // Cache by all IPs - this is crucial for IP-based lookups From 563edb156351e90163daeda663e1e33917ae9f50 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 15:19:56 +0100 Subject: [PATCH 05/21] more logging changes --- .../tech/stackable/hadoop/StackableTopologyProvider.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index ecc3e71..89962c7 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -36,6 +36,7 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { private final TopologyCache cache; public StackableTopologyProvider() { + // By default, the client will operate within the current namespace this.client = new KubernetesClientBuilder().build(); this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); this.labels = TopologyLabel.initializeTopologyLabels(); @@ -72,7 +73,10 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } - LOG.debug("Client namespaces {} and config {}", client.namespaces(), client.configMaps()); + LOG.debug( + "Client namespaces {} and configuration {}", + client.getNamespace(), + client.getConfiguration()); } @Override From e96fbd7830e1f8f87bc3e95777ef12a3be70d90e Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 15:55:01 +0100 Subject: [PATCH 06/21] code cleanup --- .../hadoop/StackableTopologyProvider.java | 25 ++++++++----------- .../tech/stackable/hadoop/TopologyCache.java | 15 +---------- .../tech/stackable/hadoop/TopologyLabel.java | 8 +++--- .../tech/stackable/hadoop/TopologyUtils.java | 1 + 4 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 89962c7..a83550e 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -16,8 +16,8 @@ * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a * topology out of dataNodes. * - *

This class is intended to be run as part of the NameNode process and will be used by the - * nameNode to retrieve topology strings for dataNodes. + *

This class is intended to be run as part of the NameNode process (in the same namespace) and + * will be used by the nameNode to retrieve topology strings for dataNodes. */ public class StackableTopologyProvider implements DNSToSwitchMapping { private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); @@ -47,11 +47,10 @@ public StackableTopologyProvider() { @Override public void reloadCachedMappings() { // TODO: According to the upstream comment we should rebuild all cache entries after - // invalidating them - // this may mean trying to resolve ip addresses that do not exist any more and things like that - // though and - // require some more thought, so we will for now just invalidate the cache. - this.cache.invalidateAll(); + // invalidating them. This may mean trying to resolve ip addresses that do not exist + // any more and things like that though and require some more thought, so we will for + // now just invalidate the cache. + this.cache.invalidateAllTopologyKeys(); } @Override @@ -73,10 +72,7 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } - LOG.debug( - "Client namespaces {} and configuration {}", - client.getNamespace(), - client.getConfiguration()); + LOG.debug("Client namespace {}", client.getNamespace()); } @Override @@ -106,8 +102,7 @@ private List createDefaultRackList(List names) { private List tryResolveFromCache(List names) { // We need to check if we have cached values for all dataNodes contained in this request. - // Unless we can answer everything from the cache we have to talk to k8s anyway and can just - // recalculate everything + // Unless we can answer everything from the cache we will perform a full resolution. List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); LOG.debug("Cached topologyKeyCache values [{}]", cached); @@ -263,10 +258,10 @@ private String resolveListenerEndpoint(GenericKubernetesResource listener) { } private GenericKubernetesResourceList fetchListeners() { + // no version is specified here as we are not always going to be on v1alpha1 ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") - .withVersion("v1alpha1") .withPlural("listeners") .withNamespaced(true) .build(); @@ -488,8 +483,10 @@ private Map> buildPodLabelMap(List dataNodes) { Map> result = new HashMap<>(); for (Pod pod : dataNodes) { Map podLabels = pod.getMetadata().getLabels(); + LOG.debug("Labels for pod [{}]:[{}]....", pod.getMetadata().getName(), podLabels); for (PodIP podIp : pod.getStatus().getPodIPs()) { + LOG.debug("...assigned to pod IP [{}]", podIp.getIp()); result.put(podIp.getIp(), podLabels); } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java index 3944fd4..15c520a 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyCache.java +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -6,7 +6,6 @@ import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.Pod; import java.util.List; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; /** Manages all caching layers for the topology provider. */ @@ -38,7 +37,7 @@ void putTopology(String key, String value) { topology.put(key, value); } - void invalidateAll() { + void invalidateAllTopologyKeys() { topology.invalidateAll(); } @@ -58,26 +57,14 @@ GenericKubernetesResource getListener(String name) { return listeners.getIfPresent(name); } - ConcurrentMap getListenerMap() { - return listeners.asMap(); - } - void putListener(String name, GenericKubernetesResource listener) { listeners.put(name, listener); } - boolean hasAllListeners(List names) { - return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); - } - Pod getPod(String name) { return pods.getIfPresent(name); } - ConcurrentMap getPodMap() { - return pods.asMap(); - } - void putPod(String name, Pod pod) { pods.put(name, pod); } diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java index ba91b79..5ba2252 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyLabel.java +++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java @@ -10,7 +10,7 @@ public class TopologyLabel { private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; - public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + public static final String VARNAME_MAX_LEVELS = "TOPOLOGY_MAX_LEVELS"; private static final int MAX_LEVELS_DEFAULT = 2; public enum Type { @@ -32,7 +32,7 @@ public enum Type { String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); if (parts.length != 2) { - LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); + LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable * because all dataNodes on the same node have the same topology. @@ -421,11 +447,17 @@ private Map buildNodeToDatanodeMap(List dataNodes) { String dataNodeIp = dataNode.getStatus().getPodIP(); if (nodeName != null && dataNodeIp != null) { + LOG.debug("Assigned to node-name [{}/{}]", nodeName, dataNodeIp); nodeToDatanode.put(nodeName, dataNodeIp); + Node node = getOrFetchNode(nodeName); + for (NodeAddress nodeAddress : node.getStatus().getAddresses()) { + LOG.debug("Assigned to node-address [{}/{}]", nodeAddress.getAddress(), dataNodeIp); + nodeToDatanode.put(nodeAddress.getAddress(), dataNodeIp); + } } } - LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); + LOG.debug("Built node-to-datanode map {}", nodeToDatanode); return nodeToDatanode; } @@ -474,7 +506,7 @@ private String extractLabelValue( * Given a list of dataNodes this function will resolve which dataNodes run on which node as well * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to * the labels that are attached to the "physical" node running the dataNodes that this ip belongs - * to. + * to. It will also do this for the node addresses as calling pods may masquerade as node IPs. * * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) * @return Map of ip addresses to labels of the node running the pod that the ip address belongs @@ -482,11 +514,11 @@ private String extractLabelValue( */ private Map> buildNodeLabelMap(List dataNodes) { Map> result = new HashMap<>(); - for (Pod pod : dataNodes) { - String nodeName = pod.getSpec().getNodeName(); + for (Pod dataNode : dataNodes) { + String nodeName = dataNode.getSpec().getNodeName(); if (nodeName == null) { - LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); + LOG.warn("Pod [{}] not yet assigned to node, retrying", dataNode.getMetadata().getName()); return result; } @@ -494,7 +526,12 @@ private Map> buildNodeLabelMap(List dataNodes) Map nodeLabels = node.getMetadata().getLabels(); LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); - for (PodIP podIp : pod.getStatus().getPodIPs()) { + for (NodeAddress nodeAddress : node.getStatus().getAddresses()) { + LOG.debug("...assigned to node address [{}]", nodeAddress.getAddress()); + result.put(nodeAddress.getAddress(), nodeLabels); + } + + for (PodIP podIp : dataNode.getStatus().getPodIPs()) { LOG.debug("...assigned to IP [{}]", podIp.getIp()); result.put(podIp.getIp(), nodeLabels); } diff --git a/test/topology-provider/stack/01-install-krb5-kdc.yaml b/test/topology-provider/stack/01-install-krb5-kdc.yaml index d1447cf..b88b7f6 100644 --- a/test/topology-provider/stack/01-install-krb5-kdc.yaml +++ b/test/topology-provider/stack/01-install-krb5-kdc.yaml @@ -31,7 +31,7 @@ spec: spec: initContainers: - name: init - image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev + image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev args: - sh - -euo @@ -52,7 +52,7 @@ spec: name: data containers: - name: kdc - image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev + image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev args: - krb5kdc - -n @@ -65,7 +65,7 @@ spec: - mountPath: /var/kerberos/krb5kdc name: data - name: kadmind - image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev + image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev args: - kadmind - -nofork @@ -78,7 +78,7 @@ spec: - mountPath: /var/kerberos/krb5kdc name: data - name: client - image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev + image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev tty: true stdin: true env: diff --git a/test/topology-provider/stack/03-hdfs.yaml b/test/topology-provider/stack/03-hdfs.yaml index b4dcaeb..3214105 100644 --- a/test/topology-provider/stack/03-hdfs.yaml +++ b/test/topology-provider/stack/03-hdfs.yaml @@ -5,44 +5,46 @@ metadata: name: simple-zk spec: image: - productVersion: 3.8.3 + productVersion: 3.9.4 + pullPolicy: IfNotPresent servers: roleGroups: default: replicas: 1 --- -apiVersion: zookeeper.stackable.tech/v1alpha1 -kind: ZookeeperZnode -metadata: - name: simple-hdfs-znode -spec: - clusterRef: - name: simple-zk ---- apiVersion: hdfs.stackable.tech/v1alpha1 kind: HdfsCluster metadata: name: simple-hdfs spec: image: - productVersion: 3.4.0 - custom: hdfs # updated by tilt + productVersion: 3.4.2 + custom: oci.stackable.tech/sandbox/andrew/hadoop:3.4.2-stackable0.0.0-topprov pullPolicy: IfNotPresent clusterConfig: dfsReplication: 1 - zookeeperConfigMapName: simple-hdfs-znode + zookeeperConfigMapName: simple-zk rackAwareness: - - labelType: node - labelName: kubernetes.io/hostname - - labelType: pod - labelName: app.kubernetes.io/role-group + - nodeLabel: kubernetes.io/hostname + - podLabel: app.kubernetes.io/role-group authentication: tlsSecretClass: tls kerberos: secretClass: kerberos-default nameNodes: config: - listenerClass: external-stable # We want to access the Web UI + listenerClass: external-stable + logging: + enableVectorAgent: false + containers: + hdfs: + console: + level: DEBUG + file: + level: DEBUG + loggers: + ROOT: + level: DEBUG configOverrides: &configOverrides core-site.xml: hadoop.user.group.static.mapping.overrides: "dr.who=;nn=;nm=;jn=;testuser=supergroup;" @@ -50,6 +52,8 @@ spec: default: replicas: 2 dataNodes: + config: + listenerClass: external-stable configOverrides: *configOverrides roleGroups: default: diff --git a/test/topology-provider/stack/07-spark.yaml b/test/topology-provider/stack/04-spark.yaml similarity index 95% rename from test/topology-provider/stack/07-spark.yaml rename to test/topology-provider/stack/04-spark.yaml index 0cb979e..b2be7b7 100644 --- a/test/topology-provider/stack/07-spark.yaml +++ b/test/topology-provider/stack/04-spark.yaml @@ -5,14 +5,14 @@ metadata: name: spark-teragen spec: sparkImage: - custom: docker.stackable.tech/stackable/spark-k8s-with-teragen:3.5.0-stackable0.0.0-dev - productVersion: 3.5.0 + custom: oci.stackable.tech/sandbox/spark-k8s:3.5.6-stackable0.0.0-terasort + productVersion: 3.5.6 pullPolicy: IfNotPresent mode: cluster mainApplicationFile: local:///tmp/spark-terasort-1.2-SNAPSHOT.jar mainClass: com.github.ehiggs.spark.terasort.TeraGen args: - - "10M" + - "100M" - "hdfs://simple-hdfs/user/stackable/teragen_output" sparkConf: "spark.driver.extraClassPath": "/etc/hadoop/conf/:/stackable/spark/extra-jars/*" diff --git a/test/topology-provider/stack/05-access-hdfs.yaml b/test/topology-provider/stack/05-access-hdfs.yaml new file mode 100644 index 0000000..c2b57c5 --- /dev/null +++ b/test/topology-provider/stack/05-access-hdfs.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-hdfs +spec: + template: + spec: + containers: + - name: access-hdfs + image: oci.stackable.tech/sandbox/andrew/hadoop:3.4.2-stackable0.0.0-topprov + imagePullPolicy: IfNotPresent + env: + - name: HADOOP_CONF_DIR + value: /stackable/conf/hdfs + - name: KRB5_CONFIG + value: /stackable/kerberos/krb5.conf + - name: HADOOP_OPTS + value: -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf + command: + - /bin/bash + - -c + - | + set -ex + klist -k /stackable/kerberos/keytab + kinit -kt /stackable/kerberos/keytab testuser/access-hdfs.default.svc.cluster.local + klist + + #bin/hdfs dfs -mkdir /stackable + #bin/hdfs dfs -chown -R access-hive /stackable + #bin/hdfs dfs -mkdir /access-hive + #bin/hdfs dfs -chown -R access-hive /access-hive + bin/hdfs dfs -ls / + #bin/hdfs dfs -ls -d / + + sleep infinity + volumeMounts: + - name: hdfs-config + mountPath: /stackable/conf/hdfs + - name: kerberos + mountPath: /stackable/kerberos + volumes: + - name: hdfs-config + configMap: + name: simple-hdfs + - name: kerberos + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: kerberos-default + secrets.stackable.tech/scope: service=access-hdfs + secrets.stackable.tech/kerberos.service.names: testuser + spec: + storageClassName: secrets.stackable.tech + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + securityContext: + fsGroup: 1000 + restartPolicy: OnFailure \ No newline at end of file diff --git a/test/topology-provider/teragen/Dockerfile b/test/topology-provider/teragen/Dockerfile index ee44bae..e054981 100644 --- a/test/topology-provider/teragen/Dockerfile +++ b/test/topology-provider/teragen/Dockerfile @@ -1,9 +1,9 @@ # # cd test/stack/teragen -# docker build . -t docker.stackable.tech/stackable/spark-k8s-with-teragen:3.5.0-stackable0.0.0-dev +# docker build . -t oci.stackable.tech/sandbox/spark-k8s:3.5.6-stackable0.0.0-terasort # -FROM oci.stackable.tech/sdp/spark-k8s:3.5.1-stackable0.0.0-dev +FROM oci.stackable.tech/sdp/spark-k8s:3.5.6-stackable0.0.0-dev # this .jar is compiled from the code here: https://github.com/ehiggs/spark-terasort -COPY --chown=stackable:stackable ./spark-terasort-1.2-SNAPSHOT.jar /tmp/ \ No newline at end of file +COPY --chown=stackable:stackable ./spark-terasort-1.2-SNAPSHOT.jar /tmp/ From 0cb5a2d5cbbd00f9a19c72cb73ee73c26c11fcf7 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 16 Dec 2025 11:15:33 +0100 Subject: [PATCH 16/21] linting --- test/topology-provider/stack/05-access-hdfs.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/topology-provider/stack/05-access-hdfs.yaml b/test/topology-provider/stack/05-access-hdfs.yaml index c2b57c5..853a343 100644 --- a/test/topology-provider/stack/05-access-hdfs.yaml +++ b/test/topology-provider/stack/05-access-hdfs.yaml @@ -32,7 +32,7 @@ spec: #bin/hdfs dfs -chown -R access-hive /access-hive bin/hdfs dfs -ls / #bin/hdfs dfs -ls -d / - + sleep infinity volumeMounts: - name: hdfs-config @@ -60,4 +60,4 @@ spec: storage: "1" securityContext: fsGroup: 1000 - restartPolicy: OnFailure \ No newline at end of file + restartPolicy: OnFailure From 745dc18a2fcfa4ae765dca7aa31c19399bc3e11d Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 16 Dec 2025 17:20:34 +0100 Subject: [PATCH 17/21] re-work flow to allow short-circuiting --- .../hadoop/StackableTopologyProvider.java | 111 ++++++++---------- 1 file changed, 47 insertions(+), 64 deletions(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 1ab619b..dc681b6 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -127,45 +127,51 @@ private List performFullResolution(List names) { // Build node-to-datanode map for O(1) colocated lookups Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); + List topologies = new ArrayList<>(); // Resolve masqueraded IPs to nodes: do this before inspecting possible listener- // or other pod-IPs as we don't want to mistakenly treat a masqueraded IP as a - // cache-miss - List resolvedNames = tryResolveNodes(names, nodeToDatanodeIp); - - // Resolve dataNode listeners to datanode IPs - resolvedNames = tryResolveListeners(resolvedNames); + // cache-miss. Examine each name in a loop, so we have the chance to short-circuit. + for (String name : names) { + String datanodeIp = tryNodeOrListenerOrPod(name, nodeToDatanodeIp, podLabels); + // Build topology strings and cache results + String topology = buildAndCacheTopology(name, datanodeIp, podLabels, nodeLabels); + topologies.add(topology); + } - // Step : Resolve client pods to co-located dataNodes - List datanodeIps = - tryResolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); + return topologies; + } - // Step : Build topology strings and cache results - // IP-masquerading can mean that the advertised IP is either a client Pod's node IP, - // or an IP that is nto easily associated with a node (e.g. if the veth interface is - // used). - return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); + private String tryNodeOrListenerOrPod( + String name, + Map nodeToDatanodeIp, + Map> podLabels) { + String dataNodeIp = nodeToDatanodeIp.get(name); + if (dataNodeIp != null) { + return dataNodeIp; + } else { + String resolvedListener = tryResolveListener(name); + if (resolvedListener != null) { + return resolvedListener; + } else { + return tryResolveClientPodToDataNode(name, podLabels, nodeToDatanodeIp); + } + } } - private List buildAndCacheTopology( - List originalNames, - List datanodeIps, + private String buildAndCacheTopology( + String originalName, + String datanodeIp, Map> podLabels, Map> nodeLabels) { - List result = new ArrayList<>(); - for (int i = 0; i < datanodeIps.size(); i++) { - String datanodeIp = datanodeIps.get(i); - String originalName = originalNames.get(i); - String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); - result.add(topology); + String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); - // Cache both the resolved IP and original name - cache.putTopology(datanodeIp, topology); - cache.putTopology(originalName, topology); - } + // Cache both the resolved IP and original name + cache.putTopology(datanodeIp, topology); + cache.putTopology(originalName, topology); - LOG.info("Built topology: {}", result); - return result; + LOG.info("Built topology: {}", topology); + return topology; } // ============================================================================ @@ -189,25 +195,6 @@ private List fetchDataNodes() { return dataNodes; } - // ============================================================================ - // NODE RESOLUTION - // ============================================================================ - - private List tryResolveNodes(List names, Map nodeToDatanodeIp) { - List result = new ArrayList<>(); - - for (String name : names) { - String dataNodeIp = nodeToDatanodeIp.get(name); - if (dataNodeIp == null) { - result.add(name); - } else { - LOG.debug("Returning dataNode {} for {}", name, dataNodeIp); - result.add(dataNodeIp); - } - } - return result; - } - // ============================================================================ // LISTENER RESOLUTION // ============================================================================ @@ -246,20 +233,18 @@ private String getListenerVersion() { } } - private List tryResolveListeners(List names) { - refreshListenerCacheIfNeeded(names); + private String tryResolveListener(String name) { + refreshListenerCacheIfNeeded(name); - return names.stream().map(this::tryResolveListenerToDatanode).collect(Collectors.toList()); + return tryResolveListenerToDatanode(name); } - private void refreshListenerCacheIfNeeded(List names) { - List missingNames = - names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); - - if (missingNames.isEmpty()) { + private void refreshListenerCacheIfNeeded(String name) { + if (cache.getListener(name) != null) { LOG.debug("Listener cache contains all required entries"); return; } + // Listeners are typically few, so fetch all LOG.debug("Fetching all listeners to populate cache"); if (listenerVersion == null) { @@ -339,21 +324,19 @@ private GenericKubernetesResourceList fetchListeners(String listenerVersion) { // CLIENT POD RESOLUTION // ============================================================================ - private List tryResolveClientPodsToDataNodes( - List names, + private String tryResolveClientPodToDataNode( + String name, Map> podLabels, Map nodeToDatanodeIp) { - refreshPodCacheIfNeeded(names); + refreshPodCacheIfNeeded(name); - return names.stream() - .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) - .collect(Collectors.toList()); + return resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp); } - private void refreshPodCacheIfNeeded(List names) { - if (cache.hasAllPods(names)) { - LOG.debug("Pod cache contains all required entries"); + private void refreshPodCacheIfNeeded(String name) { + if (cache.getPod(name) != null) { + LOG.debug("Pod cache contains entry"); return; } From 47a23cbf3aae930d9f97f2e3bdd4e6c6072c3ef8 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 16 Dec 2025 17:43:47 +0100 Subject: [PATCH 18/21] minor correction --- .../java/tech/stackable/hadoop/StackableTopologyProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index dc681b6..6323a40 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -282,7 +282,7 @@ private String tryResolveListenerToDatanode(String name) { GenericKubernetesResource listener = cache.getListener(name); if (listener == null) { LOG.debug("Not a listener: {}", name); - return name; + return null; } // We found a listener, so we can resolve it directly return resolveListenerEndpoint(listener); From 8e28aed7d80faacc0c1ff4fd42067187f6ab39ef Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 17 Dec 2025 12:37:44 +0100 Subject: [PATCH 19/21] add pod informer --- .../hadoop/StackableTopologyProvider.java | 67 +++++++++++++++++++ .../tech/stackable/hadoop/TopologyCache.java | 4 ++ 2 files changed, 71 insertions(+) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 6323a40..7d95cb3 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -5,9 +5,13 @@ import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.hadoop.net.DNSToSwitchMapping; import org.slf4j.Logger; @@ -29,11 +33,13 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { // Default values public static final String DEFAULT_RACK = "/defaultRack"; private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; + private static final int INFORMER_POLL_SECONDS = 30; // Cache on first usage (not on start-up to avoid attempts before listeners are available) private String listenerVersion; private final KubernetesClient client; private final List labels; + private final SharedInformerFactory sharedInformerFactory; // Caching layers private final TopologyCache cache; @@ -43,6 +49,8 @@ public StackableTopologyProvider() { this.client = new KubernetesClientBuilder().build(); this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); this.labels = TopologyLabel.initializeTopologyLabels(); + this.sharedInformerFactory = client.informers(); + startPodInformer(); logInitializationStatus(); } @@ -149,6 +157,10 @@ private String tryNodeOrListenerOrPod( if (dataNodeIp != null) { return dataNodeIp; } else { + // If a simple dataNode lookup does not work, we have to decide whether we + // want to have the overhead of fetching the listeners, or of fetching all + // pods in the namespace. Opt for listeners first, as there are typically + // fewer of them. String resolvedListener = tryResolveListener(name); if (resolvedListener != null) { return resolvedListener; @@ -553,4 +565,59 @@ private Map> buildPodLabelMap(List dataNodes) { } return result; } + + // ============================================================================ + // INFORMERS + // ============================================================================ + + private void startPodInformer() { + client + .pods() + .inNamespace(client.getNamespace()) + .inform( + new ResourceEventHandler<>() { + @Override + public void onAdd(Pod pod) { + cache.putPod(pod.getMetadata().getName(), pod); + for (PodIP ip : pod.getStatus().getPodIPs()) { + cache.putPod(ip.getIp(), pod); + } + LOG.info("Pod {} added", pod.getMetadata().getName()); + } + + @Override + public void onUpdate(Pod oldPod, Pod newPod) { + cache.putPod(oldPod.getMetadata().getName(), newPod); + for (PodIP ip : oldPod.getStatus().getPodIPs()) { + cache.putPod(ip.getIp(), newPod); + } + LOG.info("Pod {} updated", oldPod.getMetadata().getName()); + } + + @Override + public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { + cache.deletePod(pod.getMetadata().getName()); + for (PodIP ip : pod.getStatus().getPodIPs()) { + cache.deletePod(ip.getIp()); + } + LOG.info("Pod {} deleted", pod.getMetadata().getName()); + } + }, + INFORMER_POLL_SECONDS * 1000L); + + Future future = sharedInformerFactory.startAllRegisteredInformers(); + + try { + // this will block until complete + LOG.debug("Waiting for informer registration to complete..."); + future.get(); + } catch (InterruptedException e) { + LOG.error("Pod Informer initialization was interrupted", e); + throw new RuntimeException(e); + } catch (ExecutionException e) { + LOG.error("Pod Informer initialization encountered an exception", e); + throw new RuntimeException(e); + } + LOG.info("Pod Informer initialized."); + } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java index 15c520a..7c4857a 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyCache.java +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -69,6 +69,10 @@ void putPod(String name, Pod pod) { pods.put(name, pod); } + void deletePod(String name) { + pods.invalidate(name); + } + boolean hasAllPods(List names) { return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); } From 2b13b9f89de428b785aa2256f8d1bea7a8e5c814 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 17 Dec 2025 14:50:04 +0100 Subject: [PATCH 20/21] cleaned up docs, lowered logging level for informer --- .../hadoop/StackableTopologyProvider.java | 6 +++--- test/topology-provider/stack/03-hdfs.yaml | 4 +++- test/topology-provider/stack/04-spark.yaml | 2 +- .../topology-provider/stack/05-access-hdfs.yaml | 13 ++++++------- test/topology-provider/stack/README.md | 17 +++++++++++++++++ 5 files changed, 30 insertions(+), 12 deletions(-) create mode 100644 test/topology-provider/stack/README.md diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 7d95cb3..033c319 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -582,7 +582,7 @@ public void onAdd(Pod pod) { for (PodIP ip : pod.getStatus().getPodIPs()) { cache.putPod(ip.getIp(), pod); } - LOG.info("Pod {} added", pod.getMetadata().getName()); + LOG.debug("Pod {} added", pod.getMetadata().getName()); } @Override @@ -591,7 +591,7 @@ public void onUpdate(Pod oldPod, Pod newPod) { for (PodIP ip : oldPod.getStatus().getPodIPs()) { cache.putPod(ip.getIp(), newPod); } - LOG.info("Pod {} updated", oldPod.getMetadata().getName()); + LOG.trace("Pod {} updated", oldPod.getMetadata().getName()); } @Override @@ -600,7 +600,7 @@ public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { for (PodIP ip : pod.getStatus().getPodIPs()) { cache.deletePod(ip.getIp()); } - LOG.info("Pod {} deleted", pod.getMetadata().getName()); + LOG.debug("Pod {} deleted", pod.getMetadata().getName()); } }, INFORMER_POLL_SECONDS * 1000L); diff --git a/test/topology-provider/stack/03-hdfs.yaml b/test/topology-provider/stack/03-hdfs.yaml index 3214105..0d98522 100644 --- a/test/topology-provider/stack/03-hdfs.yaml +++ b/test/topology-provider/stack/03-hdfs.yaml @@ -19,7 +19,7 @@ metadata: spec: image: productVersion: 3.4.2 - custom: oci.stackable.tech/sandbox/andrew/hadoop:3.4.2-stackable0.0.0-topprov + custom: oci.stackable.tech/sandbox/hadoop:3.4.2-stackable0.0.0-dev pullPolicy: IfNotPresent clusterConfig: dfsReplication: 1 @@ -48,6 +48,8 @@ spec: configOverrides: &configOverrides core-site.xml: hadoop.user.group.static.mapping.overrides: "dr.who=;nn=;nm=;jn=;testuser=supergroup;" + envOverrides: + HADOOP_NAMENODE_OPTS: -agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n roleGroups: default: replicas: 2 diff --git a/test/topology-provider/stack/04-spark.yaml b/test/topology-provider/stack/04-spark.yaml index b2be7b7..bfbf724 100644 --- a/test/topology-provider/stack/04-spark.yaml +++ b/test/topology-provider/stack/04-spark.yaml @@ -12,7 +12,7 @@ spec: mainApplicationFile: local:///tmp/spark-terasort-1.2-SNAPSHOT.jar mainClass: com.github.ehiggs.spark.terasort.TeraGen args: - - "100M" + - "10M" - "hdfs://simple-hdfs/user/stackable/teragen_output" sparkConf: "spark.driver.extraClassPath": "/etc/hadoop/conf/:/stackable/spark/extra-jars/*" diff --git a/test/topology-provider/stack/05-access-hdfs.yaml b/test/topology-provider/stack/05-access-hdfs.yaml index 853a343..2c0c69f 100644 --- a/test/topology-provider/stack/05-access-hdfs.yaml +++ b/test/topology-provider/stack/05-access-hdfs.yaml @@ -8,7 +8,7 @@ spec: spec: containers: - name: access-hdfs - image: oci.stackable.tech/sandbox/andrew/hadoop:3.4.2-stackable0.0.0-topprov + image: oci.stackable.tech/sandbox/hadoop:3.4.2-stackable0.0.0-dev imagePullPolicy: IfNotPresent env: - name: HADOOP_CONF_DIR @@ -26,14 +26,13 @@ spec: kinit -kt /stackable/kerberos/keytab testuser/access-hdfs.default.svc.cluster.local klist - #bin/hdfs dfs -mkdir /stackable - #bin/hdfs dfs -chown -R access-hive /stackable - #bin/hdfs dfs -mkdir /access-hive - #bin/hdfs dfs -chown -R access-hive /access-hive + + bin/hdfs dfs -mkdir /access-hdfs + bin/hdfs dfs -chown -R access-hdfs /access-hdfs bin/hdfs dfs -ls / - #bin/hdfs dfs -ls -d / + bin/hdfs dfsadmin -printTopology + echo "Hello HDFS" | bin/hdfs dfs -put - /access-hdfs/file.txt - sleep infinity volumeMounts: - name: hdfs-config mountPath: /stackable/conf/hdfs diff --git a/test/topology-provider/stack/README.md b/test/topology-provider/stack/README.md new file mode 100644 index 0000000..c85cd9c --- /dev/null +++ b/test/topology-provider/stack/README.md @@ -0,0 +1,17 @@ +# Testing + +## Install operators + +```bash +stackablectl op in secret commons listener secret zookeeper hdfs spark-k8s +``` + +## Run scripts in the `default` namespace + +```bash +kubectl apply -f ./hdfs-utils/test/topology-provider/stack/01-install-krb5-kdc.yaml +kubectl apply -f ./hdfs-utils/test/topology-provider/stack/02-create-kerberos-secretclass.yaml +kubectl apply -f ./hdfs-utils/test/topology-provider/stack/03-hdfs.yaml +kubectl apply -f ./hdfs-utils/test/topology-provider/stack/04-spark.yaml +kubectl apply -f ./hdfs-utils/test/topology-provider/stack/05-access-hdfs.yaml +``` From 17cd4caf5f3e9ee538df85b9768fcedced41ed0b Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 17 Dec 2025 18:42:56 +0100 Subject: [PATCH 21/21] changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e31a6ad..197d00a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Refactored topology-provider to improve cache usage and readability ([#60]). + +[#60]: https://github.com/stackabletech/hdfs-utils/pull/60 + ## [0.4.2] - 2025-09-30 ### Added