|
| 1 | +# SPDX-License-Identifier: Apache-2.0 |
| 2 | +import os |
| 3 | +import csv |
| 4 | +from importlib import util |
| 5 | +from typing import Optional |
| 6 | +from enum import Enum |
| 7 | +from gaudi_topology import GaudiTopology |
| 8 | +from typing import List, Tuple |
| 9 | +REQUIRED_COLUMNS = ["model_id", "input_length", "output_length", "world_size", "data_type","num_allocated_cpu"] |
| 10 | + |
| 11 | +class BindingPolicy(Enum): |
| 12 | + Evenly_on_NUMAs = "evenly" |
| 13 | + NUMAs_with_cards = "close2cards" |
| 14 | + |
| 15 | + |
| 16 | +class CPU_Binding(): |
| 17 | + |
| 18 | + def __init__(self, |
| 19 | + csv_path: str = "cpu_binding_gnr.csv", |
| 20 | + use_hyperthread: bool = False): |
| 21 | + self.libnuma_found = util.find_spec("numa") is not None |
| 22 | + self.psutil_found = util.find_spec("psutil") is not None |
| 23 | + if self.libnuma_found and self.psutil_found: |
| 24 | + import psutil |
| 25 | + from numa import info |
| 26 | + # Get system Info |
| 27 | + self.cpu_count = psutil.cpu_count(logical=False) |
| 28 | + self.cpus_allow_list = psutil.Process().cpu_affinity() |
| 29 | + #print("cpu allow list:",self.cpus_allow_list) |
| 30 | + self.numa_size = info.get_num_configured_nodes() |
| 31 | + self.cpu_count_per_numa =self.cpu_count // self.numa_size |
| 32 | + |
| 33 | + # Get CSV info |
| 34 | + with open(csv_path, newline="") as f: |
| 35 | + rows = list(csv.DictReader(f)) |
| 36 | + if not rows or any(col not in rows[0] for col in REQUIRED_COLUMNS): |
| 37 | + found = list(rows[0].keys()) if rows else "EMPTY CSV" |
| 38 | + raise ValueError(f"CSV missing required headers {REQUIRED_COLUMNS}. Found: {found}") |
| 39 | + model = os.environ.get("MODEL") |
| 40 | + if not model: |
| 41 | + raise RuntimeError("Set environment variable MODEL to a model_id in the CSV (e.g., export MODEL='meta-llama/Llama-3.1-8B-Instruct').") |
| 42 | + input_tok = os.environ.get("INPUT_TOK") |
| 43 | + output_tok = os.environ.get("OUTPUT_TOK") |
| 44 | + con_req = os.environ.get("CONCURRENT_REQ") |
| 45 | + num_allocated_cpu = os.environ.get("NUM_CPUS") |
| 46 | + print(num_allocated_cpu) |
| 47 | + |
| 48 | + row = self.pick_row_by_parameters(rows, model, input_tok, output_tok, con_req) |
| 49 | + print(row["num_allocated_cpu"]) |
| 50 | + |
| 51 | + self.world_size = self.parse_int(row["world_size"], "world_size") |
| 52 | + binding_policy_index = self.parse_int(row["binding_policy"], "binding_policy") |
| 53 | + self.binding_policy = list(BindingPolicy)[binding_policy_index] |
| 54 | + |
| 55 | + if num_allocated_cpu: |
| 56 | + self.num_allocated_cpu = int(num_allocated_cpu) |
| 57 | + elif row["num_allocated_cpu"] == 'NA': |
| 58 | + raise RuntimeError("Invalid NUM_CPU value. Set environment variable NUM_CPUS instead .") |
| 59 | + else: |
| 60 | + self.num_allocated_cpu = self.parse_int(row["num_allocated_cpu"], "num_allocated_cpu") |
| 61 | + |
| 62 | + # CPU |
| 63 | + # check allow node_to_cpus list |
| 64 | + self.node_to_cpus = [] |
| 65 | + for i in range(self.numa_size): |
| 66 | + from numa import info |
| 67 | + node_intersect = [cpu for cpu in info.node_to_cpus(i) if cpu in self.cpus_allow_list] |
| 68 | + if bool(node_intersect): |
| 69 | + self.node_to_cpus.append(list(node_intersect)) |
| 70 | + self.node_to_idle_cpus = self.node_to_cpus.copy() |
| 71 | + #self.node_to_idle_cpus_ht = [] #self.node_to_cpus |
| 72 | + for i in range(self.numa_size): |
| 73 | + if use_hyperthread is False: |
| 74 | + self.node_to_idle_cpus[i] = self.node_to_cpus[i][:self.cpu_count_per_numa] |
| 75 | + else: |
| 76 | + self.node_to_idle_cpus[i] = self.node_to_cpus[i][self.cpu_count_per_numa:] |
| 77 | + # Gaudi |
| 78 | + topo = GaudiTopology() |
| 79 | + self.cards = topo.get_cards() |
| 80 | + if self.cards != None: |
| 81 | + self.gaudi_numa_list=[] |
| 82 | + # Assume to use cards from 0 to 7 |
| 83 | + for card in self.cards[:self.world_size]: |
| 84 | + if card['numa_node'] not in self.gaudi_numa_list: |
| 85 | + self.gaudi_numa_list.append(card['numa_node']) |
| 86 | + print(f"Card {card['card_id']} ({card['model']}):") |
| 87 | + print(f" Bus ID : {card['bus_id']}") |
| 88 | + print(f" NUMA Node : {card['numa_node']}") |
| 89 | + print(f" Local CPUs : {card['local_cpulist']}") |
| 90 | + |
| 91 | + def parse_int(self, v: str, name: str) -> int: |
| 92 | + try: |
| 93 | + return int(v) |
| 94 | + except Exception: |
| 95 | + raise ValueError(f"Invalid integer for {name!r}: {v!r}") |
| 96 | + |
| 97 | + def pick_row_by_parameters(self, rows: List[dict], model: str, input_tok: str, output_tok: str, con_req: str) -> dict: |
| 98 | + matches = [ |
| 99 | + r for r in rows |
| 100 | + if r.get("model_id", "").strip() == model |
| 101 | + if r.get("input_length", "").strip() == input_tok |
| 102 | + if r.get("output_length", "").strip() == output_tok |
| 103 | + ] |
| 104 | + if not matches: |
| 105 | + available = ", ".join(sorted({r.get('model_id','') for r in rows})) |
| 106 | + raise ValueError(f"MODEL '{model}', input_lenght '{input_tok}', output_length '{output_tok}' not found in CSV. Available: {available}") |
| 107 | + return matches[0] |
| 108 | + |
| 109 | + def get_cpus_id_binding_based_on_numa_nodes(self, |
| 110 | + rank: int) -> str: |
| 111 | + """Return CPUs id binding based on NUMA nodes. |
| 112 | + """ |
| 113 | + rank_to_cpus = '' |
| 114 | + if not self.libnuma_found or not self.psutil_found: |
| 115 | + print( |
| 116 | + "Auto thread-binding is not supported due to " |
| 117 | + "the lack of package numa and psutil," |
| 118 | + "fallback to no thread-binding. To get better performance," |
| 119 | + "please try to manually bind threads.") |
| 120 | + return rank_to_cpus |
| 121 | + |
| 122 | + if self.binding_policy is BindingPolicy.Evenly_on_NUMAs or self.cards is None: |
| 123 | + divider = min (self.world_size, len(self.node_to_cpus)) |
| 124 | + self.allocated_cpu_per_numa = self.num_allocated_cpu // divider |
| 125 | + node_id = rank |
| 126 | + elif self.binding_policy is BindingPolicy.NUMAs_with_cards: |
| 127 | + self.allocated_cpu_per_numa = self.num_allocated_cpu // len(self.gaudi_numa_list) |
| 128 | + node_id = int(self.cards[rank]['numa_node']) |
| 129 | + |
| 130 | + print("binding numa node_id %d allocated_cpu_per_numa %d", node_id, self.allocated_cpu_per_numa) |
| 131 | + # Option 1. Bind to the last N cpu cores |
| 132 | + start = self.cpu_count_per_numa - self.allocated_cpu_per_numa |
| 133 | + rank_to_cpus_list = self.node_to_cpus[node_id][start:self.cpu_count_per_numa] |
| 134 | + # Option 2. Bind to the first N cpu cores |
| 135 | + #rank_to_cpus_list = self.node_to_cpus[node_id][:self.allocated_cpu_per_numa] |
| 136 | + |
| 137 | + rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list) |
| 138 | + print("rank %d auto thread-binding list: %s", rank, rank_to_cpus) |
| 139 | + self.node_to_idle_cpus[node_id] = [cpu for cpu in self.node_to_idle_cpus[node_id] if cpu not in rank_to_cpus_list] |
| 140 | + return rank_to_cpus |
| 141 | + |
| 142 | +if __name__=="__main__": |
| 143 | + libnuma_found = util.find_spec("numa") is not None |
| 144 | + if libnuma_found: |
| 145 | + from numa import info |
| 146 | + numa_size = info.get_num_configured_nodes() |
| 147 | + else: |
| 148 | + numa_size = 1 |
| 149 | + world_size = numa_size |
| 150 | + cpu_binder = CPU_Binding(use_hyperthread=False) |
| 151 | + max_needed_numa_size = min(cpu_binder.world_size, cpu_binder.numa_size) |
| 152 | + for i in range(max_needed_numa_size): |
| 153 | + rank_to_cpus = cpu_binder.get_cpus_id_binding_based_on_numa_nodes(i) |
| 154 | + print(rank_to_cpus) |
| 155 | + |
| 156 | + |
| 157 | + rank_to_idle_cpus = ','.join(str(x) for row in cpu_binder.node_to_idle_cpus for x in row) |
| 158 | + print(rank_to_idle_cpus) |
| 159 | + for r in cpu_binder.node_to_idle_cpus: |
| 160 | + print(len(r)) |
0 commit comments