Skip to content

Commit

Permalink
Implement conditional out of SSA (#406)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: ebehner <[email protected]>
  • Loading branch information
rihi and ebehner authored Jun 20, 2024
1 parent 1e9c13c commit 8c7938e
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 84 deletions.
164 changes: 102 additions & 62 deletions decompiler/pipeline/ssa/dependency_graph.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,114 @@
from typing import Iterable, List, Optional, Set
import itertools
from itertools import combinations
from typing import Iterator

import networkx
from decompiler.structures.graphs.cfg import ControlFlowGraph
from decompiler.structures.interferencegraph import InterferenceGraph
from decompiler.structures.pseudo import Expression, Operation, OperationType
from decompiler.structures.pseudo.expressions import Variable
from decompiler.structures.pseudo.instructions import Assignment
from decompiler.structures.pseudo.operations import Call
from networkx import DiGraph, weakly_connected_components
from decompiler.util.decoration import DecoratedGraph
from networkx import MultiDiGraph

# Multiplicative constant applied to dependency scores when encountering operations, to penalize too much nesting.
OPERATION_PENALTY = 0.9

def _non_call_assignments(cfg: ControlFlowGraph) -> Iterable[Assignment]:

def decorate_dependency_graph(dependency_graph: MultiDiGraph, interference_graph: InterferenceGraph) -> DecoratedGraph:
"""
Creates a decorated graph from the given dependency and interference graphs.
This function constructs a new graph where:
- Variables are represented as nodes.
- Dependencies between variables are represented as directed edges.
- Interferences between variables are represented as red, undirected edges.
"""
decorated_graph = MultiDiGraph()
for node in dependency_graph.nodes:
decorated_graph.add_node(hash(node), label="\n".join(map(lambda n: f"{n}: {n.type}, aliased: {n.is_aliased}", node)))
for u, v, data in dependency_graph.edges.data():
decorated_graph.add_edge(hash(u), hash(v), label=f"{data['score']}")
for nodes in networkx.weakly_connected_components(dependency_graph):
for node_1, node_2 in combinations(nodes, 2):
if any(interference_graph.has_edge(pair[0], pair[1]) for pair in itertools.product(node_1, node_2)):
decorated_graph.add_edge(hash(node_1), hash(node_2), color="red", dir="none")

return DecoratedGraph(decorated_graph)


def dependency_graph_from_cfg(cfg: ControlFlowGraph) -> MultiDiGraph:
"""
Construct the dependency graph of the given CFG, i.e. adds an edge between two variables if they depend on each other.
- Add an edge the definition to at most one requirement for each instruction.
- All variables that where not defined via Phi-functions before have out-degree of at most 1, because they are defined at most once.
- Variables that are defined via Phi-functions can have one successor for each required variable of the Phi-function.
"""
dependency_graph = MultiDiGraph()

for variable in _collect_variables(cfg):
dependency_graph.add_node((variable,))
for instruction in _assignments_in_cfg(cfg):
defined_variables = instruction.definitions
for used_variable, score in _expression_dependencies(instruction.value).items():
if score > 0:
dependency_graph.add_edges_from((((dvar,), (used_variable,)) for dvar in defined_variables), score=score)

return dependency_graph


def _collect_variables(cfg: ControlFlowGraph) -> Iterator[Variable]:
"""
Yields all variables contained in the given control flow graph.
"""
for instruction in cfg.instructions:
for subexpression in instruction.subexpressions():
if isinstance(subexpression, Variable):
yield subexpression


def _assignments_in_cfg(cfg: ControlFlowGraph) -> Iterator[Assignment]:
"""Yield all interesting assignments for the dependency graph."""
for instr in cfg.instructions:
if isinstance(instr, Assignment) and isinstance(instr.destination, Variable) and not isinstance(instr.value, Call):
if isinstance(instr, Assignment):
yield instr


class DependencyGraph(DiGraph):
def __init__(self, interference_graph: Optional[InterferenceGraph] = None):
super().__init__()
self.add_nodes_from(interference_graph.nodes)
self.interference_graph = interference_graph

@classmethod
def from_cfg(cls, cfg: ControlFlowGraph, interference_graph: InterferenceGraph):
"""
Construct the dependency graph of the given CFG, i.e. adds an edge between two variables if they depend on each other.
- Add an edge the definition to at most one requirement for each instruction.
- All variables that where not defined via Phi-functions before have out-degree at most 1, because they are defined at most once
- Variables that are defined via Phi-functions can have one successor for each required variable of the Phi-function.
"""
dependency_graph = cls(interference_graph)
for instruction in _non_call_assignments(cfg):
defined_variable = instruction.destination
if isinstance(instruction.value, Variable):
if dependency_graph._variables_can_have_same_name(defined_variable, instruction.value):
dependency_graph.add_edge(defined_variable, instruction.requirements[0], strength="high")
elif len(instruction.requirements) == 1:
if dependency_graph._variables_can_have_same_name(defined_variable, instruction.requirements[0]):
dependency_graph.add_edge(defined_variable, instruction.requirements[0], strength="medium")
else:
if non_interfering_variable := dependency_graph._non_interfering_requirements(instruction.requirements, defined_variable):
dependency_graph.add_edge(defined_variable, non_interfering_variable, strength="low")
return dependency_graph

def _non_interfering_requirements(self, requirements: List[Variable], defined_variable: Variable) -> Optional[Variable]:
"""Get the unique non-interfering requirement if it exists, otherwise we return None."""
non_interfering_requirement = None
for required_variable in requirements:
if self._variables_can_have_same_name(defined_variable, required_variable):
if non_interfering_requirement:
return None
non_interfering_requirement = required_variable
return non_interfering_requirement

def _variables_can_have_same_name(self, source: Variable, sink: Variable) -> bool:
"""
Two variable can have the same name, if they have the same type, are both aliased or both non-aliased variables, and if they
do not interfere.
:param source: The potential source vertex.
:param sink: The potential sink vertex
:return: True, if the given variables can have the same name, and false otherwise.
"""
if self.interference_graph.are_interfering(source, sink) or source.type != sink.type or source.is_aliased != sink.is_aliased:
return False
if source.is_aliased and sink.is_aliased and source.name != sink.name:
return False
return True

def get_components(self) -> Iterable[Set[Variable]]:
"""Returns the weakly connected components of the dependency graph."""
for component in weakly_connected_components(self):
yield set(component)
def _expression_dependencies(expression: Expression) -> dict[Variable, float]:
"""
Calculate the dependencies of an expression in terms of its constituent variables.
This function analyzes the given `expression` and returns a dictionary mapping each
`Variable` to a float score representing its contribution or dependency weight within
the expression.
The scoring mechanism accounts for different types of operations and
penalizes nested operations to reflect their complexity.
"""
match expression:
case Variable():
return {expression: 1.0}
case Operation():
if expression.operation in {
OperationType.call,
OperationType.address,
OperationType.dereference,
OperationType.member_access,
}:
return {}

operands_dependencies = list(filter(lambda d: d, (_expression_dependencies(operand) for operand in expression.operands)))
dependencies: dict[Variable, float] = {}
for deps in operands_dependencies:
for var in deps:
score = deps[var]
score /= len(operands_dependencies)
score *= OPERATION_PENALTY # penalize operations, so that expressions like (a + (a + (a + (a + a)))) gets a lower score than just (a)

if var not in dependencies:
dependencies[var] = score
else:
dependencies[var] += score

return dependencies
case _:
return {}
26 changes: 15 additions & 11 deletions decompiler/pipeline/ssa/outofssatranslation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from collections import defaultdict
from configparser import NoOptionError
from enum import Enum
from typing import DefaultDict, List
from typing import Callable, DefaultDict, List

from decompiler.pipeline.ssa.phi_cleaner import PhiFunctionCleaner
from decompiler.pipeline.ssa.phi_dependency_resolver import PhiDependencyResolver
from decompiler.pipeline.ssa.phi_lifting import PhiFunctionLifter
from decompiler.pipeline.ssa.variable_renaming import MinimalVariableRenamer, SimpleVariableRenamer
from decompiler.pipeline.ssa.variable_renaming import ConditionalVariableRenamer, MinimalVariableRenamer, SimpleVariableRenamer
from decompiler.pipeline.stage import PipelineStage
from decompiler.structures.graphs.cfg import BasicBlock
from decompiler.structures.interferencegraph import InterferenceGraph
Expand Down Expand Up @@ -98,12 +98,13 @@ def _out_of_ssa(self) -> None:
-> There are different optimization levels
"""
try:
self.out_of_ssa_strategy[self._optimization](self)
except KeyError:
error_message = f"The Out of SSA according to the optimization level {self._optimization.value} is not implemented so far."
logging.error(error_message)
raise NotImplementedError(error_message)
strategy = self.out_of_ssa_strategy.get(self._optimization, None)
if strategy is None:
raise NotImplementedError(
f"The Out of SSA according to the optimization level {self._optimization.value} is not implemented so far."
)

strategy(self)

def _simple_out_of_ssa(self) -> None:
"""
Expand Down Expand Up @@ -158,12 +159,15 @@ def _conditional_out_of_ssa(self) -> None:
This is a more advanced algorithm for out of SSA:
- We first remove the circular dependency of the Phi-functions
- Then, we remove the Phi-functions by lifting them to their predecessor basic blocks.
- Afterwards, we rename the variables, by considering their dependency on each other.
- Afterwards, we rename the variables by considering their dependency on each other.
"""
pass
PhiDependencyResolver(self._phi_functions_of).resolve()
self.interference_graph = InterferenceGraph(self.task.graph)
PhiFunctionLifter(self.task.graph, self.interference_graph, self._phi_functions_of).lift()
ConditionalVariableRenamer(self.task, self.interference_graph).rename()

# This translator maps the optimization levels to the functions.
out_of_ssa_strategy = {
out_of_ssa_strategy: dict[SSAOptions, Callable[["OutOfSsaTranslation"], None]] = {
SSAOptions.simple: _simple_out_of_ssa,
SSAOptions.minimization: _minimization_out_of_ssa,
SSAOptions.lift_minimal: _lift_minimal_out_of_ssa,
Expand Down
98 changes: 95 additions & 3 deletions decompiler/pipeline/ssa/variable_renaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
from operator import attrgetter
from typing import DefaultDict, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union

import networkx
from decompiler.pipeline.ssa.dependency_graph import dependency_graph_from_cfg
from decompiler.structures.graphs.cfg import ControlFlowGraph
from decompiler.structures.interferencegraph import InterferenceGraph
from decompiler.structures.pseudo.expressions import GlobalVariable, Variable
from decompiler.structures.pseudo.instructions import BaseAssignment, Instruction, Relation
from decompiler.structures.pseudo.typing import Type
from decompiler.task import DecompilerTask
from decompiler.util.insertion_ordered_set import InsertionOrderedSet
from decompiler.util.lexicographical_bfs import LexicographicalBFS
from networkx import Graph, connected_components
from networkx import Graph, MultiDiGraph, connected_components


@dataclass
Expand Down Expand Up @@ -121,10 +124,11 @@ def rename(self):

def _replace_variable_in_instruction(self, variable: Variable, instruction: Instruction) -> None:
"""Replace the given variable in the given instruction"""
if variable.ssa_label is None:
if variable not in self.renaming_map:
return
replacement_variable = self.renaming_map[variable].copy()
replacement_variable.ssa_name = variable.copy()
if variable.ssa_label is not None:
replacement_variable.ssa_name = variable.copy()
instruction.substitute(variable, replacement_variable)
if isinstance(instruction, Relation):
instruction.rename(variable, replacement_variable)
Expand Down Expand Up @@ -334,3 +338,91 @@ def _classes_of(self, neighborhood: Iterable[Variable]) -> Iterable[Variable]:
for neighbor in neighborhood:
if neighbor in self._variable_classes_handler.color_class_of:
yield self._variable_classes_handler.color_class_of[neighbor]


class ConditionalVariableRenamer(VariableRenamer):
"""
A renaming strategy that renames the SSA-variables, such that only variables that have a relation with each other can get the same name.
Therefore, we construct a dependency-graph with weights, telling us how likely these two variables are the same variable, i.e.,
copy-assignments are more likely to be identically than complicated computations.
"""

def __init__(self, task, interference_graph: InterferenceGraph):
"""
self._color_classes is a dictionary where the set of keys is the set of colors
and to each color we assign the set of variables of this color.
"""
super().__init__(task, interference_graph.copy())
self._generate_renaming_map(task.graph)

def _generate_renaming_map(self, cfg: ControlFlowGraph):
"""
Generate the renaming map for SSA variables.
This function constructs a dependency graph from the given CFG, merges contracted variables,
creates variable classes, and computes new names for each variable. The process ensures that
only variables with specific relationships can share the same name, as determined by the
dependency graph.
:param cfg: The control flow graph from which the dependency graph is derived.
"""
dependency_graph = dependency_graph_from_cfg(cfg)
dependency_graph = self.merge_contracted_variables(dependency_graph)

self.create_variable_classes(dependency_graph)
self.compute_new_name_for_each_variable()

def merge_contracted_variables(self, dependency_graph: MultiDiGraph):
"""Merge nodes which need to be contracted from self._variables_contracted_to"""
mapping: dict[tuple[Variable], tuple[Variable, ...]] = {}
for variable in self.interference_graph.nodes():
contracted = tuple(self._variables_contracted_to[variable])
for var in contracted:
mapping[(var,)] = contracted

return networkx.relabel_nodes(dependency_graph, mapping)

def create_variable_classes(self, dependency_graph: MultiDiGraph):
"""Create the variable classes based on the given dependency graph."""
while True:
merged_edges: dict[frozenset[tuple[Variable, ...]], float] = defaultdict(lambda: 0)
for u, v, score in dependency_graph.edges(data="score"):
if u != v:
merged_edges[frozenset([u, v])] += score

for (u, v), _ in sorted(merged_edges.items(), key=lambda edge: edge[1], reverse=True):
if u == v: # self loop
continue
if not self._variables_can_have_same_name(u, v):
continue

break
else:
# We didn't find any remaining nodes to contract, break outer loop
break

networkx.relabel_nodes(dependency_graph, {u: (*u, *v), v: (*u, *v)}, copy=False)

self._variable_classes_handler = VariableClassesHandler(defaultdict(set))
for i, vars in enumerate(dependency_graph.nodes):
for var in vars:
self._variable_classes_handler.add_variable_to_class(var, i)

def _variables_can_have_same_name(self, source: tuple[Variable, ...], sink: tuple[Variable, ...]) -> bool:
"""
Two sets of variables can have the same name, if they have the same type, are both aliased or both non-aliased variables, and if they
do not interfere.
:param source: The potential source vertex.
:param sink: The potential sink vertex
:return: True, if the given sets of variables can have the same name, and false otherwise.
"""
if (
self.interference_graph.are_interfering(*(source + sink))
or source[0].type != sink[0].type
or source[0].is_aliased != sink[0].is_aliased
):
return False
if source[0].is_aliased and sink[0].is_aliased and source[0].name != sink[0].name:
return False
return True
4 changes: 2 additions & 2 deletions decompiler/util/to_dot_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from networkx import DiGraph

HEADER = "strict digraph {"
HEADER = "digraph {"
FOOTER = "}"


class ToDotConverter:
"""Class in charge of writing a networkx DiGraph into dot-format"""

ATTRIBUTES = {"color", "fillcolor", "label", "shape", "style"}
ATTRIBUTES = {"color", "fillcolor", "label", "shape", "style", "dir"}

def __init__(self, graph: DiGraph):
self._graph = graph
Expand Down
Loading

0 comments on commit 8c7938e

Please sign in to comment.