|
| 1 | +# SPDX-License-Identifier: Apache-2.0 |
| 2 | +import os |
| 3 | +import csv |
| 4 | +from importlib import util |
| 5 | +from enum import Enum |
| 6 | +from gaudi_topology import GaudiTopology |
| 7 | + |
| 8 | +REQUIRED_COLUMNS = ["model_id", "input_length", "output_length", "world_size", "data_type", "num_allocated_cpu"] |
| 9 | + |
| 10 | + |
| 11 | +class BindingPolicy(Enum): |
| 12 | + Evenly_on_NUMAs = "evenly" |
| 13 | + NUMAs_with_cards = "close2cards" |
| 14 | + |
| 15 | + |
| 16 | +class CPU_Binding: |
| 17 | + |
| 18 | + def __init__(self, csv_path: str = "cpu_binding_gnr.csv", use_hyperthread: bool = False): |
| 19 | + self.libnuma_found = util.find_spec("numa") is not None |
| 20 | + self.psutil_found = util.find_spec("psutil") is not None |
| 21 | + if self.libnuma_found and self.psutil_found: |
| 22 | + import psutil |
| 23 | + from numa import info |
| 24 | + # Get system Info |
| 25 | + self.cpu_count = psutil.cpu_count(logical=False) |
| 26 | + self.cpus_allow_list = psutil.Process().cpu_affinity() |
| 27 | + #print("cpu allow list:",self.cpus_allow_list) |
| 28 | + self.numa_size = info.get_num_configured_nodes() |
| 29 | + self.cpu_count_per_numa = self.cpu_count // self.numa_size |
| 30 | + |
| 31 | + # Get CSV info |
| 32 | + with open(csv_path, newline="") as f: |
| 33 | + rows = list(csv.DictReader(f)) |
| 34 | + if not rows or any(col not in rows[0] for col in REQUIRED_COLUMNS): |
| 35 | + found = list(rows[0].keys()) if rows else "EMPTY CSV" |
| 36 | + raise ValueError(f"CSV missing required headers {REQUIRED_COLUMNS}. Found: {found}") |
| 37 | + model = os.environ.get("MODEL") |
| 38 | + if not model: |
| 39 | + raise RuntimeError("Set environment variable MODEL to a model_id in the CSV.") |
| 40 | + input_tok = os.environ.get("INPUT_TOK") |
| 41 | + output_tok = os.environ.get("OUTPUT_TOK") |
| 42 | + con_req = os.environ.get("CONCURRENT_REQ") |
| 43 | + num_allocated_cpu = os.environ.get("NUM_CPUS") |
| 44 | + print(num_allocated_cpu) |
| 45 | + |
| 46 | + row = self.pick_row_by_parameters(rows, model, input_tok, output_tok, con_req) |
| 47 | + print(row["num_allocated_cpu"]) |
| 48 | + |
| 49 | + self.world_size = self.parse_int(row["world_size"], "world_size") |
| 50 | + binding_policy_index = self.parse_int(row["binding_policy"], "binding_policy") |
| 51 | + self.binding_policy = list(BindingPolicy)[binding_policy_index] |
| 52 | + |
| 53 | + if num_allocated_cpu: |
| 54 | + self.num_allocated_cpu = int(num_allocated_cpu) |
| 55 | + elif row["num_allocated_cpu"] == 'NA': |
| 56 | + raise RuntimeError("Invalid NUM_CPU value. Set environment variable NUM_CPUS instead .") |
| 57 | + else: |
| 58 | + self.num_allocated_cpu = self.parse_int(row["num_allocated_cpu"], "num_allocated_cpu") |
| 59 | + |
| 60 | + # CPU |
| 61 | + # check allow node_to_cpus list |
| 62 | + self.node_to_cpus = [] |
| 63 | + for i in range(self.numa_size): |
| 64 | + from numa import info |
| 65 | + filtered_node_to_cpus = self.filter_one_cpu_per_core(info.node_to_cpus(i)) |
| 66 | + node_intersect = [cpu for cpu in filtered_node_to_cpus if cpu in self.cpus_allow_list] |
| 67 | + if bool(node_intersect): |
| 68 | + self.node_to_cpus.append(list(node_intersect)) |
| 69 | + self.node_to_idle_cpus = self.node_to_cpus.copy() |
| 70 | + #self.node_to_idle_cpus_ht = [] #self.node_to_cpus |
| 71 | + for i in range(self.numa_size): |
| 72 | + if use_hyperthread is False: |
| 73 | + self.node_to_idle_cpus[i] = self.node_to_cpus[i][:self.cpu_count_per_numa] |
| 74 | + else: |
| 75 | + self.node_to_idle_cpus[i] = self.node_to_cpus[i][self.cpu_count_per_numa:] |
| 76 | + # Gaudi |
| 77 | + topo = GaudiTopology() |
| 78 | + self.cards = topo.get_cards() |
| 79 | + if self.cards is not None: |
| 80 | + self.gaudi_numa_list = [] |
| 81 | + # Assume to use cards from 0 to 7 |
| 82 | + for card in self.cards[:self.world_size]: |
| 83 | + if card['numa_node'] not in self.gaudi_numa_list: |
| 84 | + self.gaudi_numa_list.append(card['numa_node']) |
| 85 | + print(f"Card {card['card_id']} ({card['model']}):") |
| 86 | + print(f" Bus ID : {card['bus_id']}") |
| 87 | + print(f" NUMA Node : {card['numa_node']}") |
| 88 | + print(f" Local CPUs : {card['local_cpulist']}") |
| 89 | + |
| 90 | + def parse_int(self, v: str, name: str) -> int: |
| 91 | + try: |
| 92 | + return int(v) |
| 93 | + except Exception as err: |
| 94 | + raise ValueError(f"Invalid integer for {name!r}: {v!r}") from err |
| 95 | + |
| 96 | + def pick_row_by_parameters(self, rows: list[dict], model: str, input_tok: str, output_tok: str, |
| 97 | + con_req: str) -> dict: |
| 98 | + matches = [ |
| 99 | + r for r in rows if r.get("model_id", "").strip() == model if r.get("input_length", "").strip() == input_tok |
| 100 | + if r.get("output_length", "").strip() == output_tok |
| 101 | + ] |
| 102 | + if not matches: |
| 103 | + # fallback: match only by model_id |
| 104 | + matches = [r for r in rows if r.get('model_id', '') == model] |
| 105 | + print(f"Warning: using fallback entry for model '{model}' without exact input/output token match") |
| 106 | + if not matches: |
| 107 | + available = ", ".join(sorted({r.get('model_id', '') for r in rows})) |
| 108 | + raise ValueError(f"MODEL '{model}', input_length '{input_tok}', output_length '{output_tok}' " |
| 109 | + f"not found in CSV. Available: {available}") |
| 110 | + return matches[0] |
| 111 | + |
| 112 | + def filter_one_cpu_per_core(self, cpus): |
| 113 | + """ |
| 114 | + Given a list of CPU IDs (possibly with HT pairs), |
| 115 | + return a filtered list with only one logical CPU per physical core. |
| 116 | + """ |
| 117 | + seen_cores = set() |
| 118 | + filtered = [] |
| 119 | + for cpu in sorted(cpus): |
| 120 | + core_path = f"/sys/devices/system/cpu/cpu{cpu}/topology/core_id" |
| 121 | + try: |
| 122 | + with open(core_path) as f: |
| 123 | + core_id = int(f.read().strip()) |
| 124 | + except FileNotFoundError: |
| 125 | + continue |
| 126 | + if core_id not in seen_cores: |
| 127 | + seen_cores.add(core_id) |
| 128 | + filtered.append(cpu) |
| 129 | + return filtered |
| 130 | + |
| 131 | + def get_cpus_id_binding_based_on_numa_nodes(self, rank: int) -> str: |
| 132 | + """Return CPUs id binding based on NUMA nodes. |
| 133 | + """ |
| 134 | + rank_to_cpus = '' |
| 135 | + if not self.libnuma_found or not self.psutil_found: |
| 136 | + print("Auto thread-binding is not supported due to " |
| 137 | + "the lack of package numa and psutil," |
| 138 | + "fallback to no thread-binding. To get better performance," |
| 139 | + "please try to manually bind threads.") |
| 140 | + return rank_to_cpus |
| 141 | + |
| 142 | + if self.binding_policy is BindingPolicy.Evenly_on_NUMAs or self.cards is None: |
| 143 | + #divider = min(self.world_size, len(self.node_to_cpus)) |
| 144 | + self.allocated_cpu_per_numa = self.num_allocated_cpu // len(self.node_to_cpus) |
| 145 | + node_id = rank |
| 146 | + elif self.binding_policy is BindingPolicy.NUMAs_with_cards: |
| 147 | + self.allocated_cpu_per_numa = self.num_allocated_cpu // len(self.gaudi_numa_list) |
| 148 | + node_id = int(self.cards[rank]['numa_node']) |
| 149 | + |
| 150 | + print("binding numa node_id %d allocated_cpu_per_numa %d", node_id, self.allocated_cpu_per_numa) |
| 151 | + # Option 1. Bind to the last N cpu cores |
| 152 | + start = self.cpu_count_per_numa - self.allocated_cpu_per_numa |
| 153 | + rank_to_cpus_list = self.node_to_cpus[node_id][start:self.cpu_count_per_numa] |
| 154 | + # Option 2. Bind to the first N cpu cores |
| 155 | + #rank_to_cpus_list = self.node_to_cpus[node_id][:self.allocated_cpu_per_numa] |
| 156 | + |
| 157 | + rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) |
| 158 | + print("rank %d auto thread-binding list: %s", rank, rank_to_cpus) |
| 159 | + self.node_to_idle_cpus[node_id] = [ |
| 160 | + cpu for cpu in self.node_to_idle_cpus[node_id] if cpu not in rank_to_cpus_list |
| 161 | + ] |
| 162 | + return rank_to_cpus |
| 163 | + |
| 164 | + |
| 165 | +if __name__ == "__main__": |
| 166 | + libnuma_found = util.find_spec("numa") is not None |
| 167 | + if libnuma_found: |
| 168 | + from numa import info |
| 169 | + numa_size = info.get_num_configured_nodes() |
| 170 | + else: |
| 171 | + numa_size = 1 |
| 172 | + world_size = numa_size |
| 173 | + cpu_binder = CPU_Binding(use_hyperthread=False) |
| 174 | + if cpu_binder.binding_policy is BindingPolicy.Evenly_on_NUMAs or cpu_binder.cards is None: |
| 175 | + max_needed_numa_size = len(cpu_binder.node_to_cpus) |
| 176 | + elif cpu_binder.binding_policy is BindingPolicy.NUMAs_with_cards: |
| 177 | + max_needed_numa_size = min(cpu_binder.world_size, len(cpu_binder.node_to_cpus)) |
| 178 | + for i in range(max_needed_numa_size): |
| 179 | + rank_to_cpus = cpu_binder.get_cpus_id_binding_based_on_numa_nodes(i) |
| 180 | + print(rank_to_cpus) |
| 181 | + |
| 182 | + rank_to_idle_cpus = ','.join(str(x) for row in cpu_binder.node_to_idle_cpus for x in row) |
| 183 | + print(rank_to_idle_cpus) |
| 184 | + for r in cpu_binder.node_to_idle_cpus: |
| 185 | + print(len(r)) |
0 commit comments