PyTA Project: Augment CFG Edges with Z3 Constraints
Today's task requires a combination of various components of PythonTA introduced in previous articles, including control flow graph module, Z3 visitor, and Z3 expression wrapper. In this task, we will augment each control flow graph edge with a list a Z3 constraints that represents the logical constraints of function arguments for this edge to be reached in the program. New constraints are created from function preconditions specified in the function docstring, if
conditions, and while
conditions. For example, considering the CFG below:
In this graph, x
and y
are function parameters, and the logical constraint on the starting edge is the function precondition. When the graph reaches an if
statement or while
loop, edges after these statements are added with the if/while
condition (or its negation on the False
branch). In addition, if a function argument is being reassigned, we would discard every Z3 constraint involving this variable in the edges following the reassignment, as in this case the variable's value becomes indeterminant.
The Z3Environment Class
For this task, we first need to come up with a way to keep track of logical constraints and reassignment status of variables along an execution path. The Z3Environment
class is used for this purpose. It has three attributes: variable_unassigned
tracks whether a parameter has not been reassigned. variable_type
stores the data type of each function parameter extracted of parameter type annotations. Currently, only int
, float
,bool
, str
types and list/set/tuple
literals can be processed by ExprWrapper
, the class the converts python types to corresponding Z3 types. constraints
represents the list of Z3 constraints along the current execution path.
class Z3Environment:
"""Z3 Environment stores the Z3 variables and constraints in the current CFG path
variable_unassigned:
A dictionary mapping each variable in the current environment to a boolean indicating
whether it has been reassigned (False) or remains unassigned (True).
variable_type:
A dictionary mapping each variable in the current environment to its data type.
constraints:
A list of Z3 constraints in the current environment.
"""
variable_unassigned: Dict[str, bool]
variable_type: Dict[str, str]
constraints: List[ExprRef]
def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
"""Initialize the environment with function parameters and preconditions"""
self.variable_unassigned = {var: True for var in variables.keys()}
self.variable_type = variables
self.constraints = constraints.copy()
For adding new Z3 constraints, we need the following two methods: add_constraint
appends a Z3 constraint to the list of constraints, and parse_constraint
converts an Astroid AST to an equivalent Z3 expression. Later, during the graph traverse, we will need to first call parse_constraint
to convert the python expression (represented as Astroid AST) to Z3 constraint, and then add it to environment with add_constraint
def add_constraint(self, constraint: ExprRef) -> None:
"""
Add a new z3 constraint to environment
"""
self.constraints.append(constraint)
def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
"""
Parse an Astroid node to a Z3 constraint
Return the resulting expression
"""
ew = ExprWrapper(node, self.variable_type)
try:
return ew.reduce()
except (Z3Exception, Z3ParseException):
return None
To handle variable reassignment, the assign
method is called when reaching a reassignment statement, which marks the status of variable as False
(not unassigned, which is, being assigned).
def assign(self, name: str) -> None:
"""Handle a variable assignment statement"""
if name in self.variable_unassigned:
self.variable_unassigned[name] = False
Finally, putting everything together, the update_constraints
method is where we generate the constraints for an edge and remove constraints with reassigned variables. It needs a helper function _get_vars
, which returns the variables included in a given Z3 expression.
def update_constraints(self) -> List[ExprRef]:
"""
Returns all z3 constraints in the environments
Removes constraints with reassigned variables
"""
updated_constraints = []
for constraint in self.constraints:
# discard expressions with reassigned variables
variables = _get_vars(constraint)
reassigned = any(
not self.variable_unassigned.get(variable, False) for variable in variables
)
if not reassigned:
updated_constraints.append(constraint)
self.constraints = updated_constraints
return updated_constraints.copy()
def _get_vars(expr: ExprRef) -> Set[str]:
"""
Retrieve all z3 variables from a z3 expression
"""
variables = set()
def traverse(e):
if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
variables.add(e.decl().name())
elif hasattr(e, "children"):
for child in e.children():
traverse(child)
traverse(expr)
return variables
Depth-First Traverse Through CFG
After setting up the Z3 environment, there is another preparation we need to make: getting all the paths along the control flow graph. The standard way to do so is to implement a depth-first search through the graph. I have created an implementation of DFS in a previous article (https://raineyang.hashnode.dev/pyta-project-depth-first-search-for-control-flow-graph). However, that one traverses through the nodes in the graph, while today we need to get all the edges along each path, which I implement in the code below:
def _get_paths(self) -> List[List[CFGEdge]]:
"""Get edges that represent paths from start to end node in depth-first order."""
paths = []
def _visited(
edge: CFGEdge, visited_edges: Set[CFGEdge], visited_nodes: Set[CFGBlock]
) -> bool:
return edge in visited_edges or edge.target in visited_nodes
def _dfs(
current_edge: CFGEdge,
current_path: List[CFGEdge],
visited_edges: Set[CFGEdge],
visited_nodes: Set[CFGBlock],
):
# note: both visited edges and visited nodes need to be tracked to correctly handle cycles
if _visited(current_edge, visited_edges, visited_nodes):
return
visited_edges.add(current_edge)
visited_nodes.add(current_edge.source)
current_path.append(current_edge)
if current_edge.target == self.end or all(
_visited(edge, visited_edges, visited_nodes)
for edge in current_edge.target.successors
):
paths.append(current_path.copy())
else:
for edge in current_edge.target.successors:
_dfs(edge, current_path, visited_edges, visited_nodes)
current_path.pop()
visited_edges.remove(current_edge)
_dfs(self.start.successors[0], [], set(), set())
return paths
The algorithm is basically the same as the version with nodes. One thing worth noting is that we need to keep track of both visited nodes and edges, and visit an edge only if the edge itself and its target node are not visited, to handle cycles correctly. Considering the case below: in this while
loop structure, the desired behavior is to form two execution paths, one into the loop and one out of the loop.
However, if we only keep track of unvisited edges, the first traverse will go through both edges in the cycle, leading to the one path below. Thus, we also need to stop at visited nodes to handle cycles.
Add Z3 Constraints to CFG Edges
Now it's time to put everything together in the update_edge_z3_constraint
method in ControlFlowGraph
class:
def update_edge_z3_constraints(self) -> None:
"""Traverse through edges and add Z3 constraints on each edge.
Constraints are generated from:
- Function preconditions
- If conditions
- While conditions
Constraints with reassigned variables are not included in subsequent edges."""
First, we iterate through the paths generated in _get_paths
method and create a new Z3Environment
along each path. An edge with a non-None condition
indicates a if
or while
condition, which we add it (or its negation, if the edge is labeled False
) to the current Z3Environment
. Finally, we invoke update_constraints
to every edge in the path.
for path_id, path in enumerate(self._get_paths()):
# starting a new path
z3_environment = Z3Environment(self._z3_vars, self.precondition_constraints)
for edge in path:
# traverse through edge
if edge.condition is not None:
condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
if condition_z3_constraint is not None:
if edge.label == "True":
z3_environment.add_constraint(condition_z3_constraint)
elif edge.label == "False":
z3_environment.add_constraint(Not(condition_z3_constraint))
edge.z3_constraints[path_id] = z3_environment.update_constraints()
One thing to note about is that we are storing Z3 constraints on each edge in a Dict[int, List]
, where the key is path_id
and value is the list of constraints. This is because one edge can be part of multiple paths, each with its own sets of constraints. The path_id
will be used to distinguish which list of constraints belongs to which path.
# traverse into target node
for node in edge.target.statements:
if isinstance(node, Assign):
# mark reassigned variables
for target in node.targets:
if isinstance(target, AssignName):
z3_environment.assign(target.name)
After that, we need to iterate through statements in the node after the edge to determine if there are any arguments being reassigned.
At this point, the main part of the task is completed, but there are still a few minor points worth noting. Firstly, z3
is an optional dependency in PythonTA, so we should also make sure that the functionality of control flow graph itself it not affected even if z3-related features are not enabled. We need to put the import statements related to z3 in a try/except
block, which marks the variable z3_dependency_available
as False
if an ImportError
occurs.
try:
from z3 import Z3_OP_UNINTERPRETED, ExprRef, Not, Z3Exception, is_const
from ..transforms import ExprWrapper
z3_dependency_available = True
except ImportError:
ExprRef = Any
ExprWrapper = Any
Not = Any
Z3Exception = Any
is_const = Any
Z3_OP_UNINTERPRETED = Any
z3_dependency_available = False
If z3 is not successfully imported, we would disable the feature update_edge_z3_constraints
def update_edge_z3_constraints(self) -> None:
"""Traverse through edges and add Z3 constraints on each edge.
Constraints are generated from:
- Function preconditions
- If conditions
- While conditions
Constraints with reassigned variables are not included in subsequent edges."""
if not z3_dependency_available:
return
Secondly, we need to invoke Z3Visitor
at some point to set up initial z3 constraints from function precondition docstrings. Initially I try to directly integrate it into CFGVisitor
module. However, this turns out to be less optimal as it introduces coupling between two modules that are, in principle, unrelated. Instead, inside visit_functiondef
method in CFGVisitor
we would conditionally call update_edge_z3_constraints
only if the FunctionDef
node has the z3_constraints
attribute added by Z3Visitor
if hasattr(func, "z3_constraints"):
self._current_cfg.precondition_constraints = func.z3_constraints
self._current_cfg.update_edge_z3_constraints()
Subscribe to my newsletter
Read articles from Raine directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by